11_Pulsar Adaptors适配器、kafka适配器、Spark适配器

2.3. Pulsar Adaptors适配器
2.3.1.kafka适配器
2.3.2.Spark适配器

2.3. Pulsar Adaptors适配器

2.3.1.kafka适配器

Pulsar 为使用 Apache Kafka Java 客户端 API 编写的应用程序提供了一个简单的解决方案。
在生产者中, 如果想不改变原有kafka的代码架构, 就切换到Pulsar的平台中, 那么Pulsar adaptor on kafka就变的非常的有用了, 它可以帮助我们在不改变原有kafka的代码基础上, 即可接入pulsar, 但是需要注意, 相关配置信息需要进行一些调整, 例如: 地址与topic

  • 1- 需要导入Pulsar集成kafka的依赖包, 删除掉原有Kafka-client包
<dependency> <groupId>org.apache.pulsar</groupId> <artifactId>pulsar-client-kafka</artifactId> <version>2.8.0</version> 
</dependency>

注: 目前Pulsar并在Maven中央仓库中并没有提供Pulsar-client-kafka 2.8.1的包, 故此处导入2.8.0

  • 2-编写生产者
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;
import java.util.concurrent.ExecutionException;public class KafkaAdaptorProducer {public static void main(String[] args) throws ExecutionException, InterruptedException {//1. 创建kafka生产者的核心类对象: KafkaProducer// 1.1: 创建生产者配置对象: 设置相关配置Properties props = new Properties();props.put("bootstrap.servers", "pulsar://node1:6650,node2:6650,node3:6650");// 消息的确认方案props.put("acks", "all");// key序列化类型props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");// value 序列化类型props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");Producer<String, String> producer = new KafkaProducer<>(props); //2. 发送数据 for (int i = 0; i < 10; i++) { //2.1: 创建 生产者数据承载对象 一个对象代表是一条消息数据ProducerRecord<String, String> producerRecord = new ProducerRecord<>("persistent://public/default/txn_t1",Integer.toString(i), Integer.toString(i)); producer.send(producerRecord).get(); }//3. 释放资源 producer.close();}}
  • 3-编写消费者
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;public class KafkaAdaptorConsumer {public static void main(String[] args) {//1. 创建kafka的消费者的核心对象: KafkaConsumer//1.1: 创建消费者配置对象, 并设置相关的参数:Properties props = new Properties();props.setProperty("bootstrap.servers", "pulsar://node1:6650,node2:6650,node3:6650");//消费者组的 idprops.setProperty("group.id", "test");//是否启动消费者自动提交消费偏移量props.setProperty("enable.auto.commit", "true");//每间隔多长时间提交一次偏移量:单位 毫秒props.setProperty("auto.commit.interval.ms","1000");//key 反序列化props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");//val 发序列化props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);//2. 给消费者设置订阅topic:consumer.subscribe(Arrays.asList("persistent://public/default/txn_t1"));//3. 循环获取相关的消息数据while (true) {//3.1: 从kafka中获取消息数据: 参数表示等待超时时间//注意: 如果没有获取到数据, 返回一个空集合对象, 如果数据集合中有多个 ConsumerRecord 对象ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));//3.2 遍历ConsumerRecords 获取每一个 ConsumerRecord 对象 : ConsumerRecord 消费者数据承载对象, 一个对象就是一条消息for (ConsumerRecord<String, String> record : records) {String massage = record.value();System.out.println("消息数据为:"+massage);}} } 
}
  • 4- 先运行消费者, 进行监听, 然后运行生产者, 观察消费者是否可以正常消费到数据
    在这里插入图片描述

2.3.2.Spark适配器

Pulsar 的 Spark Streaming 接收器是一个自定义的接收器,它使用 Apache Spark Streaming 能够从 Pulsar 接
收原始数据。

应用程序可以通过 Spark Streaming receiver 接收 Resilient Distributed Dataset (RDD) 格式的数据,并可
以通过多种方式对其进行处理。

  • 1-导入相关的依赖包
<dependency><groupId>org.apache.pulsar</groupId><artifactId>pulsar-spark</artifactId><version>2.8.0</version>
</dependency>
  • 2-编写spark的流式代码
String serviceUrl = "pulsar://localhost:6650/"; 
String topic = "persistent://public/default/test_src"; 
String subs = "test_sub"; 
SparkConf sparkConf = new SparkConf().setMaster("local[*]").setAppName("Pulsar Spark Example"); 
JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, Durations.seconds(60)); 
ConsumerConfigurationData<byte[]> pulsarConf = new ConsumerConfigurationData(); 
Set<String> set = new HashSet<>(); 
set.add(topic); 
pulsarConf.setTopicNames(set); 
pulsarConf.setSubscriptionName(subs); 
SparkStreamingPulsarReceiver pulsarReceiver = new SparkStreamingPulsarReceiver( 
serviceUrl, 
pulsarConf, 
new AuthenticationDisabled()); 
JavaReceiverInputDStream<byte[]> lineDStream = jsc.receiverStream(pulsarReceiver);

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/28493.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

FreeRTOS通过消息队列实现串口命令解析(串口中断)

作者&#xff1a;Jack_G 时间&#xff1a;2023.08.08 版本&#xff1a;V1.0 上次修改时间&#xff1a; 环境&#xff1a; \quad \quad \quad \quad STM32Cube MX V6.8.1 \quad \quad \quad \quad STM32CubeH7 Firmware Package V1.11.0 / 04-Nov-2022 \quad \quad \quad \qu…

[SQL智慧航行者] - 用户购买商品推荐

话不多说, 先看数据表信息. 数据表信息: employee 表, 包含所有员工信息, 每个员工有其对应的 id, salary 和 departmentid. --------------------------------- | id | name | salary | departmentid | --------------------------------- | 1 | Joe | 70000 | 1 …

抖音的竞争对手?Meta计划人工智能聊天机器人增加社交媒体数量

在来自抖音的竞争中&#xff0c;Meta着眼于用户参与的下一个前沿。 报道&#xff0c;Meta正在开发一系列具有不同个性的人工智能聊天机器人&#xff0c;此举旨在增加用户在脸书和Instagram等社交平台上的参与度金融时报和边缘。这些聊天机器人被Meta staff称为“personas ”,将…

LabVIEW开发高压配电设备振动信号特征提取与模式识别

LabVIEW开发高压配电设备振动信号特征提取与模式识别 矿用高压配电设备是井下供电系统中的关键设备之一&#xff0c;肩负着井下供配电和供电安全的双重任务&#xff0c;其工作状态直接影响着井下供电系统的安全性和可靠性。机械故障占配电总故障的70%。因此&#xff0c;机械故…

代理模式及常见的3种代理类型对比

代理模式及常见的3种代理类型对比 代理模式代理模式分类静态代理JDK动态代理CGLIBFastclass机制 三种代理方式之间对比常见问题 代理模式 代理模式是一种设计模式&#xff0c;提供了对目标对象额外的访问方式&#xff0c;即通过代理对象访问目标对象&#xff0c;这样可以在不修…

嵌入式开发实用工具——QFSViewer

嵌入式开发实用工具——QFSViewer 介绍 今天给大家推荐个我个人业余时间开发的一个嵌入式开发实用工具——QFSViewer&#xff0c;这个工具主要是用来加载查看各种嵌入式常用的文件系统映像&#xff0c;目前支持JFSS2、Fat32、Fat16、Fat12、exFat、Ext2、Ext3、Ext4等文件系统…

用栈判断是否匹配

1 问题 写代码的时候用到的括号都是成双成对的出现&#xff0c;并且大小也相同。在集成编辑环境中&#xff0c;IDE就会为我们自己动检查括号是否匹配。那么为了避免在报错&#xff0c;如何判断是否有无括号不匹配&#xff1f; 2 方法 利用栈来实现这种功能。当遇见一个左括号&a…

【Linux命令行与Shell脚本编程】 第十七章 图形化桌面环境脚本编程

Linux命令行与Shell脚本编程 第十七章 图形化桌面环境脚本编程 文章目录 Linux命令行与Shell脚本编程七.图形化桌面环境脚本编程7.1.创建文本菜单7.1.1.创建菜单布局7.1.2.创建菜单逻辑7.1.3.整合脚本菜单7.1.4.使用select命令 7.2.创建文本窗口部件7.2.1.dialog软件包部件msg…

wpf 项目中使用 Prism + MaterialDesign

1.通过nuget安装MaterialDesign 2.通过nuget安装Prism 3.修改App.xmal <prism:PrismApplication x:Class"VisionMeasureGlue.App"xmlns"http://schemas.microsoft.com/winfx/2006/xaml/presentation"xmlns:x"http://schemas.microsoft.com/winfx/…

24华东交通软件工程837考研题库

1&#xff0e;Jackson设计方法是由英国的M&#xff0e;Jackson所提出的。它是一种面向( )的软件设 计方法。 A&#xff0e;对象 B&#xff0e;数据流 C&#xff0e;数据结构 D&#xff0e;控制结构 答案:C 2&#xff0e;软件设计中&#xff0c;Jackson方法是一种面向…

66 # form 数据格式化

实现一个 http 服务器 客户端会发送请求 GET POST 要处理不同的请求体的类型 表单格式&#xff08;formData a1&b2&#xff09;&#xff0c;可以直接通信不会出现跨域问题JSON &#xff08;"{"kaimo":"313"}"&#xff09;文件格式 &#x…

Android 项目导入高德SDK初次上手

文章目录 一、前置知识&#xff1a;二、学习目标三、学习资料四、操作过程1、创建空项目2、高德 SDK 环境接入2.1 获取高德 key2.2下载 SDK 并导入2.2.1、下载SDK 文件2.2.2、SDK 导入项目2.2.3、清单文件配置2.2.4、隐私权限 3、显示地图 一、前置知识&#xff1a; 1、Java 基…

数据结构----算法--分治,快速幂

数据结构----算法–分治,快速幂 一.分治 1.分治的概念 分治法&#xff1a;分而治之 将一个问题拆解成若干个解决方式完全相同的问题 满足分治的四个条件 1.问题难度随着数据规模缩小而降低 2.问题可拆分 3.子问题间相互独立 4.子问题的解可合并 2.二分查找(折半搜索)…

移动端自动化测试实战

UI自动化测试的价值 1、提升回归测试的效率 2、可以进行兼容性测试 UI 自动化测试应用场景 • 冒烟测试自动化&#xff1a;提测之前自动断言提测质量&#xff0c;提供准入参考。 • 功能测试自动化&#xff1a;辅助 QA 与测试工程师的快速验证。 • 验收测试自动化&#xf…

stable-diffusion-webui 界面汉化

本教程通过安装 sd-webui-bilingual-localization 插件来达到汉化目的, 项目地址为:https://github.com/journey-ad/sd-webui-bilingual-localization 一、安装插件 先进入插件安装界面 在搜索栏搜索 zh_CN Localization 中文语言包, 项目地址: https://github.com/dtlnor/st…

CrossOver是什么软件 CrossOver软件好用吗

CrossOver是一款由CodeWeavers公司开发的软件&#xff0c;它可以在Mac和Linux等操作系统上运行Windows软件&#xff0c;而无需在计算机上安装Windows操作系统。这款软件的核心技术是Wine&#xff0c;它是一种在Linux和macOS等操作系统上运行Windows应用程序的开源软件。本文将会…

mysql按季度查询数据

select * from 表名 where YEAR(add_time) 2023 and QUARTER(add_time) 1

使用docker 搭建nginx + tomcat 集群

创建3个Tomcat容器&#xff0c;端口分别映射到 8080,8081,8082&#xff0c;使用数据卷挂载&#xff0c;分别将宿主机目录下的 /opt/module/docker/tomcat3/ROOT1/&#xff0c;/opt/module/docker/tomcat3/ROOT2/&#xff0c;/opt/module/docker/tomcat3/ROOT2/ 挂载到 容器内部…

STM32外部中断

配置是stm32外部中断步骤&#xff1a; 1.使能IO时钟和AFIO时钟 2.配置EXIT中断线 3.配置中断控制器NVIC (EXTI15_10_IRQn,EXTI4_IRQn&#xff09; 4.写中断服务子程序 &#xff08;EXTI15_10_IRQHandler&#xff09; 首先要明白中断IO对应的中断线EXTIx 其中 PA0-PE0对应的是E…

CAD练习——绘制冲压件三视图

首先还是先设置咱们的绘图模板&#xff1a; 这是图层划分&#xff1a; 文字样式设置&#xff1a; 标注样式&#xff1a; 从主视图开始&#xff0c;首先绘制如下图形 用到的快捷指令&#xff1a; L&#xff1a;直线 O&#xff1a;偏移 TR&#xff1a;修剪 效果&#xff1a;…