flink: 从pulsar中读取数据

一、依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>pulsar-demo</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><pulsar.version>3.2.0</pulsar.version></properties><!-- in your <properties> block --><!-- in your <dependencies> block --><dependencies><dependency><groupId>org.apache.pulsar</groupId><artifactId>pulsar-client-all</artifactId><version>${pulsar.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.13.6</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.11</artifactId><version>1.13.6</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.11</artifactId><version>1.13.6</version></dependency><!-- https://mvnrepository.com/artifact/io.streamnative.connectors/pulsar-flink-connector --><dependency><groupId>io.streamnative.connectors</groupId><artifactId>pulsar-flink-connector_2.11</artifactId><version>1.13.6.1-rc13</version><exclusions><exclusion><groupId>org.apache.pulsar</groupId><artifactId>pulsar-client-all</artifactId></exclusion></exclusions></dependency></dependencies></project>

二、demo程序

package cn.edu.tju.test2;import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSource;
import org.apache.flink.streaming.util.serialization.PulsarDeserializationSchema;import java.util.Properties;public class FlinkPulsar01 {private static String SERVER_4 = "xx.xx.xx.xx";public static void main(String[] args) throws Exception {//创建flink的流式执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();//配置pulsarString serviceUrl = "pulsar://" + SERVER_4 + ":6650";String adminUrl = "http://" + SERVER_4 + ":8080";Properties properties = new Properties();properties.put("topic", "persistent://public/default/my-topic");//创建flink sourceFlinkPulsarSource<String> source = new FlinkPulsarSource<String>(serviceUrl, adminUrl,PulsarDeserializationSchema.valueOnly(new SimpleStringSchema()), properties);//从topic 最早的数据开始消费source.setStartFromEarliest();DataStreamSource<String> dataStreamSource = env.addSource(source);//打印数据流dataStreamSource.print();//执行jobenv.execute("my job");}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/775231.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

配置小程序的服务器域名

准备工作 拥有一个已注册的域名&#xff1a;确保您已经注册了一个符合国家和地区相关法律法规要求的域名。 完成域名备案&#xff08;如有必要&#xff09;&#xff1a;根据国家和地区的法律法规&#xff0c;某些情况下可能需要对域名进行备案才能用于互联网服务。 配置 DNS&…

Vue 二次封装组件的艺术与实践

&#x1f90d; 前端开发工程师、技术日更博主、已过CET6 &#x1f368; 阿珊和她的猫_CSDN博客专家、23年度博客之星前端领域TOP1 &#x1f560; 牛客高级专题作者、打造专栏《前端面试必备》 、《2024面试高频手撕题》 &#x1f35a; 蓝桥云课签约作者、上架课程《Vue.js 和 E…

备考ICA----Istio实验9---熔断Circuit Breaking 实验

备考ICA----Istio实验9—熔断Circuit Breaking 实验 1. 环境准备 创建httpbin环境 kubectl apply -f istio/samples/httpbin/httpbin.yaml kubectl get svc httpbin2. 创建测试用客户端 kubectl apply -f istio/samples/httpbin/sample-client/fortio-deploy.yaml3. 创建Ht…

python笔记进阶--模块、文件及IO操作(1)

目录 一&#xff0e;模块 1.模块的导入和使用 1.1导入整个模块 1.2导入函数 1.3使用as给模块指定别名 2.常见标准库 2.1 import random&#xff1a; 2.2 import math&#xff1a; 2.3正则表达式处理 2.4turtle 二&#xff0e;文件及IO操作 1.文件 1.1绝对路径与相…

Intellij IDEA 类注释模板设置

1、配置全局USER 在此配置全局USER&#xff0c;用于填充自动生成的注释中的作者author属性。 注释模板中的user参数是默认是获取系统的用户&#xff08;当然注释作者也可以直接写固定值&#xff09;&#xff0c;如果不想和系统用户用同一个信息&#xff0c;可以在IDEA中进行配…

【自我提升】一、Hyperledger Fabric 概念梳理

写在前面&#xff1a;最近因为业务需要&#xff0c;开始学习Hyperledger Fabric了&#xff0c;做java全栈工程师可真难搞。现在算是啥类型的都在涉及了&#xff0c;现在这个技术啥都不懂&#xff0c;就先开个学习专栏&#xff0c;记录记录。顺带也给各位道友参考参考。 目录 …

「媒体宣传」媒体邀约几种常见方法!-51媒体

传媒如春雨&#xff0c;润物细无声&#xff0c;大家好&#xff0c;我是51媒体网胡老师。 媒体邀约的常见方法确实包括电话邀约、邮件邀约、社交媒体邀约以及通过媒体公关公司代邀约等。 电话邀约&#xff1a;这是一种直接且高效的方式&#xff0c;可以通过电话与媒体记者沟通&…

HTTP请求走私!!!(一)

想都是问题&#xff0c;做才是答案 什么是请求走私&#xff1f; HTTP请求走私是针对于服务端处理一个或者多个接收http请求序列的方式&#xff0c;进行绕过安全机制&#xff0c;实施未授权访问一种攻击手段&#xff0c;获取敏感信息&#xff0c;并直接危害其他用户。 Web 应用…

【前端学习——css篇】4.px和rem的区别

https://github.com/febobo/web-interview 4.px和rem的区别 ①px px&#xff0c;表示像素&#xff0c;所谓像素就是呈现在我们显示器上的一个个小点&#xff0c;每个像素点都是大小等同的&#xff0c;所以像素为计量单位被分在了绝对长度单位中 有些人会把px认为是相对长度&…

大语言模型(LLM)token解读

1. 什么是token&#xff1f; 人们经常在谈论大模型时候&#xff0c;经常会谈到模型很大&#xff0c;我们也常常会看到一种说法&#xff1a; 参数会让我们了解神经网络的结构有多复杂&#xff0c;而token的大小会让我们知道有多少数据用于训练参数。 什么是token&#xff1f;比…

浅聊openGauss逻辑架构

浅聊 openGauss 逻辑架构 概述 openGauss 数据库是一款由华为主导、各个生态合作伙伴共同建设的开源关系型数据库管理系统&#xff0c;开源发行协议遵从木兰宽松许可证 v2。 openGauss 数据库源于 PostgreSQL-XC 项目&#xff0c;内核源于 Postgres 9.2.4&#xff0c;总代码…

mybatis注解方式if标签报错元素内容必须由格式正确的字符数据或标记组成

在使用mybatis的注解方式的时候出现个问题&#xff0c;我需要一个复杂的sql语句&#xff0c;既有if判断又有in语句&#xff0c;刚开始使用mybatis自己的if动态函数的时候完全没问题&#xff0c;代码如下&#xff1a; Select({"select * ","from order_info &qu…

利用python脚本,根据词条爬取百度图片(爬虫)

把广角&#xff0c;换成你的关键词就行 # -*- coding: utf-8 -*- """ Created on Wed Mar 29 10:17:50 2023 author: MatpyMaster """ import requests import os import redef get_images_from_baidu(keyword, page_num, save_dir):header {Us…

Hadoop+Spark大数据技术 第三次作业

第三次作业 1.简述HDFS Shell三种操作命令hadoop fs、hadoop dfs、hdfs dfs的异同点。 相同点 用于与 Hadoop 分布式文件系统&#xff08;HDFS&#xff09;交互。可以执行各种文件系统操作&#xff0c;如文件复制、删除、移动等。 不同点 hadoop fs、hadoop dfs已弃用&#xf…

使用vue构建一个简单实用的春节红包插件!

摘要&#xff1a;本文将介绍如何使用Vue.js构建一个简单实用的春节红包插件。该插件通过模拟红包的打开和关闭过程&#xff0c;以及金额的随机分配&#xff0c;为春节红包活动提供了一个有趣且互动的体验。 一、引言 在春节这个充满欢乐和祝福的时刻&#xff0c;红包成为了传递…

Encoding类

Encoding System.Text.Encoding 是 C# 中用于处理字符编码和字符串与字节之间转换的类。它提供了各种静态方法和属性&#xff0c;**用于在不同字符编码之间进行转换&#xff0c;**以及将字符串转换为字节数组或反之。 在处理多语言文本、文件、网络通信以及其他字符数据的场景…

node.js项目初始化操作

项目环境Vscode 1.新建一个文件夹node.js(xx.js) 2.右键点击node.js&#xff0c;点击打开终端 我在VScode打开终端 输入npm init初始化项目没反应。 解决方法&#xff1a;进入文件夹node.js&#xff0c;出入cmd跳转到终端 重新输入npm init命令 正确结果如下图 后续命令按下…

【Leetcode】2580. 统计将重叠区间合并成组的方案数

文章目录 题目思路代码复杂度分析时间复杂度空间复杂度 结果总结 题目 题目链接&#x1f517; 给你一个二维整数数组 ranges &#xff0c;其中 ranges[i] [starti, endi] 表示 starti 到 endi 之间&#xff08;包括二者&#xff09;的所有整数都包含在第 i 个区间中。 你需要…

Tunes不能读取iPhone的内容,请前往iPhone偏好设置的摘要选项卡,然后单击恢复以将此iPhone恢复为出厂设置

重启itunes: 参考链接&#xff1a; https://baijiahao.baidu.com/s?id1642568736254330322&wfrspider&forpc 人工智能学习网站&#xff1a; https://chat.xutongbao.top

什么是solana PDA账户?

Solana 设计的一个核心理念是万物皆账户&#xff0c;Solana 上的几乎所有数据都可以表示为账户。 在 Solana 上&#xff0c;不仅仅是用户的钱包地址拥有账户&#xff0c;还包括智能合约、数据结构、代币、NFT 等。这些账户可以持有各种类型的数据&#xff0c;包括数字资产、智…