14_基于Flink将pulsar数据写入到HBase

3.7.基于Flink将数据写入到HBase

3.7.1.编写Flink完成数据写入到Hbase操作, 完成数据备份, 便于后续进行即席查询和离线分析

3.7.1.1.HBase基本介绍

hbase是基于Google发布bigTable论文产生一款软件, 是一款noSQL型数据, 不支持SQL. 不支持join的操作, 没有表关系, 不支持事务(多行事务),hbase是基于 HDFS的采用java 语言编写

查询hbase数据一般有三种方案(主键(row key)查询, 主键的范围检索,查询全部数据)

都是以字节类型存储,存储结构化和半结构化数据。

hbase表的特点: 大 面向列的存储方案 稀疏性

2.7.1.2.应用场景

1)需要进行随机读写的操作。
2)数据量比较大。
3)数据比较稀疏。

2.7.1.3.HBase安装操作

本次安装的HBase为2.2.7,详细的安装手册大家可以参考资料, 还需要大家注意,HBase的启动需要依赖于zookeeper
和HDFS的, 顾需要先安装 HADOOP与zookeeper
在这里插入图片描述

  • 1-在Hbase中创建目标表
create 'itcast_h_ems, {NAME=>'f1',COMPRESSION=>'GZ'},{NUMREGIONS=>6, SPLITALGO=>'HexStringSplit'}
  • 2- 编写Flink代码完成写入Hbase操作
import com.itheima.pojo.PulsarTopicPojo;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSource;
import org.apache.flink.streaming.connectors.pulsar.internal.JsonDeser;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import java.util.Properties;// 基于Flink消费Pulsar数据, 然后将数据灌入到HBase中, 完成数据备份, 以及后续即席查询和离线分析
public class ItcastFlinkToHBase {public static void main(String[] args) throws Exception {//1. 创建Flinnk流式处理核心环境类对象 和 Table API 核心环境类对象StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);//2. 添加Source组件, 从Pulsar中读取消息数据Properties props = new Properties();props.setProperty("topic","persistent://public/default/itcast_ems_tab");props.setProperty("partition.discovery.interval-millis","5000");FlinkPulsarSource<PulsarTopicPojo> pulsarSource = new FlinkPulsarSource<PulsarTopicPojo>("pulsar://node1:6650,node2:6650,node3:6650","http://node1:8080,node2:8080,node3:8080",JsonDeser.of(PulsarTopicPojo.class),props);//2.1 设置pulsarSource组件在消费数据的时候, 默认从什么位置开始消费pulsarSource.setStartFromLatest();DataStreamSource<PulsarTopicPojo> dataStreamSource = env.addSource(pulsarSource);//2.2 转换为Flink TableSchema schema = Schema.newBuilder().column("id", DataTypes.INT()).column("sid", DataTypes.STRING()).column("ip", DataTypes.STRING()).column("session_id", DataTypes.STRING()).column("create_time", DataTypes.STRING()).column("yearInfo", DataTypes.STRING()).column("monthInfo", DataTypes.STRING()).column("dayInfo", DataTypes.STRING()).column("hourInfo", DataTypes.STRING()).column("seo_source", DataTypes.STRING()).column("area", DataTypes.STRING()).column("origin_channel", DataTypes.STRING()).column("msg_count", DataTypes.INT()).column("from_url", DataTypes.STRING()).build();tableEnv.createTemporaryView("itcast_ems",dataStreamSource,schema);//2.3: 定义HBase的目标表String hTable = "create table itcast_h_ems("+"rowkey int,"+"f1 ROW<sid STRING,ip STRING,session_id STRING,create_time STRING,yearInfo STRING,monthInfo STRING,dayInfo STRING,hourInfo STRING,seo_source STRING,area STRING,origin_channel STRING,msg_count INT,from_url STRING>,"+"primary key(rowkey) NOT ENFORCED" +") WITH ("+"'connector'='hbase-2.2',"+"'table-name'='itcast_h_ems',"+"'zookeeper.quorum'='node1:2181,node2:2181,node3:2181'"+")";//4. 执行操作tableEnv.executeSql(hTable);tableEnv.executeSql("insert into itcast_h_ems select id,ROW(sid,ip,session_id,create_time,yearInfo,monthInfo,dayInfo,hourInfo,seo_source,area,origin_channel,msg_count,from_url) from itcast_ems");}}

PulsarTopicPojo

public class PulsarTopicPojo {private Integer id;private String sid;private String ip;private String session_id;private String create_time;private String yearInfo;private String monthInfo;private String dayInfo;private String hourInfo;private String seo_source;private String area;private String origin_channel;private Integer msg_count;private  String from_url;public PulsarTopicPojo() {}public PulsarTopicPojo(Integer id, String sid, String ip, String session_id, String create_time, String yearInfo, String monthInfo, String dayInfo, String hourInfo, String seo_source, String area, String origin_channel, Integer msg_count, String from_url) {this.id = id;this.sid = sid;this.ip = ip;this.session_id = session_id;this.create_time = create_time;this.yearInfo = yearInfo;this.monthInfo = monthInfo;this.dayInfo = dayInfo;this.hourInfo = hourInfo;this.seo_source = seo_source;this.area = area;this.origin_channel = origin_channel;this.msg_count = msg_count;this.from_url = from_url;}public void setData(Integer id, String sid, String ip, String session_id, String create_time, String yearInfo, String monthInfo, String dayInfo, String hourInfo, String seo_source, String area, String origin_channel, Integer msg_count, String from_url) {this.id = id;this.sid = sid;this.ip = ip;this.session_id = session_id;this.create_time = create_time;this.yearInfo = yearInfo;this.monthInfo = monthInfo;this.dayInfo = dayInfo;this.hourInfo = hourInfo;this.seo_source = seo_source;this.area = area;this.origin_channel = origin_channel;this.msg_count = msg_count;this.from_url = from_url;}public Integer getId() {return id;}public void setId(Integer id) {this.id = id;}public String getSid() {return sid;}public void setSid(String sid) {this.sid = sid;}public String getIp() {return ip;}public void setIp(String ip) {this.ip = ip;}public String getSession_id() {return session_id;}public void setSession_id(String session_id) {this.session_id = session_id;}public String getCreate_time() {return create_time;}public void setCreate_time(String create_time) {this.create_time = create_time;}public String getYearInfo() {return yearInfo;}public void setYearInfo(String yearInfo) {this.yearInfo = yearInfo;}public String getMonthInfo() {return monthInfo;}public void setMonthInfo(String monthInfo) {this.monthInfo = monthInfo;}public String getDayInfo() {return dayInfo;}public void setDayInfo(String dayInfo) {this.dayInfo = dayInfo;}public String getHourInfo() {return hourInfo;}public void setHourInfo(String hourInfo) {this.hourInfo = hourInfo;}public String getSeo_source() {return seo_source;}public void setSeo_source(String seo_source) {this.seo_source = seo_source;}public String getArea() {return area;}public void setArea(String area) {this.area = area;}public String getOrigin_channel() {return origin_channel;}public void setOrigin_channel(String origin_channel) {this.origin_channel = origin_channel;}public Integer getMsg_count() {return msg_count;}public void setMsg_count(Integer msg_count) {this.msg_count = msg_count;}public String getFrom_url() {return from_url;}public void setFrom_url(String from_url) {this.from_url = from_url;}@Overridepublic String toString() {return "PulsarTopicPojo{" +"id=" + id +", sid='" + sid + '\'' +", ip='" + ip + '\'' +", session_id='" + session_id + '\'' +", create_time='" + create_time + '\'' +", yearInfo='" + yearInfo + '\'' +", monthInfo='" + monthInfo + '\'' +", dayInfo='" + dayInfo + '\'' +", hourInfo='" + hourInfo + '\'' +", seo_source='" + seo_source + '\'' +", area='" + area + '\'' +", origin_channel='" + origin_channel + '\'' +", msg_count=" + msg_count +", from_url='" + from_url + '\'' +'}';}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/35403.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

链表OJ详解

&#x1f495;人生不满百&#xff0c;常怀千岁忧&#x1f495; 作者&#xff1a;Mylvzi 文章主要内容&#xff1a;链表oj详解 题目一&#xff1a;移除元素 题目要求&#xff1a; 画图分析&#xff1a; 代码实现&#xff1a; struct ListNode* removeElements(struct List…

基于chatgpt动手实现一个ai_translator

动手实现一个ai翻译 前言 最近在极客时间学习《AI 大模型应用开发实战营》&#xff0c;自己一边跟着学一边开发了一个进阶版本的 OpenAI-Translator&#xff0c;在这里简单记录下开发过程和心得体会&#xff0c;供有兴趣的同学参考&#xff1b; ai翻译程序 版本迭代 在学习…

Synchronized八锁

/** * Description: 8 锁 * 1 标准访问&#xff0c;先打印短信还是邮件 ------sendSMS ------sendEmail 2 停 4 秒在短信方法内&#xff0c;先打印短信还是邮件 ------sendSMS ------sendEmail 3 新增普通的 hello 方法&#xff0c;是先打短信还是 hello ------getHello ------…

Idea中使用statement接口对象,显示mysql版本号,所有库和表名

使用statement 接口对象&#xff0c;进行以下操作&#xff1a; 显示数据库版本号显示所有库显示所有库中的table表 显示数据库版本号&#xff1a; public class StatementDemo {Testvoid showall(){try{Statement st conn.createStatement();ResultSet rs st.executeQuery(…

pytest fixture 常用参数

fixture 常用的参数 参数一&#xff1a;autouse&#xff0c;作用&#xff1a;自动运行&#xff0c;无需调用 举例一&#xff1a;我们在类中定义一个function 范围的fixture; 设置它自动执行autouseTrue&#xff0c;那么我们看下它执行结果 输出&#xff1a; 说明&#xff1a;…

Leetcode-每日一题【剑指 Offer 12. 矩阵中的路径】

题目 单词必须按照字母顺序&#xff0c;通过相邻的单元格内的字母构成&#xff0c;其中“相邻”单元格是那些水平相邻或垂直相邻的单元格。同一个单元格内的字母不允许被重复使用。 例如&#xff0c;在下面的 34 的矩阵中包含单词 "ABCCED"&#xff08;单词中的字母…

CUDA执行模型

一、CUDA执行模型概述 二、线程束执行 1. 线程束与线程块 线程束是SM中基本的执行单元。 当一个线程块的网格被启动后&#xff0c;网格中的线程块分布在SM中。 一旦线程块被调度到一个SM中&#xff0c;线程块中的线程会被进一步划分成线程束。 一个线程束由32个连续的线程…

基于自适应曲线阈值和非局部稀疏正则化的压缩感知图像复原研究【自适应曲线阈值去除加性稳态白/有色高斯噪声】(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…

什么是媒体代发布?媒体代发布注意事项

传媒如春雨&#xff0c;润物细无声&#xff0c;大家好&#xff0c;我是51媒体网胡老师。 媒体代发布是指将新闻稿或其他宣传内容委托给专业的媒体代理机构或公司进行发布和推广的活动。这些机构通常拥有丰富的媒体资源、人脉和经验&#xff0c;能够更好地将信息传递给目标受众…

C语言 指针与内存之间的关系

一、内存与字节 一个内存单元一个字节一个地址 整型 int 类型中int类型的字节数是4 且一个字节表示八个bite位 一个二进制数位有着32个bite 所以又可以表示为&#xff1a;一个字节 8个比特位 32位数的二进制数位的八分之一 例如&#xff1a; int a 10&#xff1b; 该表达式…

项目实战 — 消息队列(9){编写demo程序}

消息队列服务器核心功能就是&#xff0c;提供了虚拟主机&#xff0c;交换机&#xff0c; 队列&#xff0c;消息等概念的管理&#xff0c;实现三种典型的消息转发方式&#xff0c;可以实现跨主机/服务器之间的生产者消费模型。 这里&#xff0c;就编写一个demo&#xff0c;实现…

JAVA多线程和并发基础面试问答(翻译)

JAVA多线程和并发基础面试问答(翻译) java多线程面试问题 1. 进程和线程之间有什么不同&#xff1f; 一个进程是一个独立(self contained)的运行环境&#xff0c;它可以被看作一个程序或者一个应用。而线程是在进程中执行的一个任务。Java运行环境是一个包含了不同的类和程序…

苏州OV泛域名RSA加密算法https

RSA加密算法是一种非对称加密算法&#xff0c;它被广泛应用于信息安全领域。与对称加密算法不同&#xff0c;RSA加密算法使用了两个密钥&#xff0c;一个公钥和一个私钥。公钥可以公开&#xff0c;任何人都可以使用它加密信息&#xff0c;但只有私钥的持有者才能解密信息。RSA加…

php如何对接伪原创api

在了解伪原创api的各种应用形态之后&#xff0c;我们继续探讨智能写作背后的核心技术。需要说明的是&#xff0c;智能写作和自然语言生成、自然语言理解、知识图谱、多模算法等各类人工智能算法都有紧密的关联&#xff0c;在百度的智能写作实践中&#xff0c;常根据实际需求将多…

全球劳动力革命,Papaya Global 打破薪资界限

员工需求和劳动力结构的进一步变化&#xff0c;只会增加对更加自动化和全面的全球薪资解决方案的需求。 远程工作潮流与全球劳动力的蓬勃发展&#xff0c;使得企业在全球范围内&#xff0c;寻找最优秀的人才成为可能。然而&#xff0c;随之而来的复杂薪资管理挑战&#xff0c;也…

优雅地处理RabbitMQ中的消息丢失

目录 一、异常处理 二、消息重试机制 三、错误日志记录 四、死信队列 五、监控与告警 优雅地处理RabbitMQ中的消息丢失对于构建可靠的消息系统至关重要。下面将介绍一些优雅处理消息丢失的方案&#xff0c;包括异常处理、重试机制、错误日志记录、死信队列和监控告警等。…

BUUCTF题目Web部分wp(持续更新)

关于SQL注入的一些通用办法 可以访问哪些表 如有权限&#xff0c;查询当前用户可以访问的所有表 --Oracle查询当前用户可访问的所有表 select owner&#xff0c; table_name from all_tables order by table_name; --MySQL查询用户可访问的所有数据库和表 select table_sche…

爬虫017_urllib库_get请求的quote方法_urlencode方法_---python工作笔记036

按行来看get请求方式 比如这个地址 上面这个地址复制粘贴过来以后 可以看到周杰伦变成了一堆的Unicode编码了 所以这个时候我们看,我们说https这里,用了UA反爬,所以这里 我们构建一个自定义的Request对象,里面要包含Us

电脑mfc140u.dll丢失的怎么办呢?这个方法亲测可以解决

修复mfc140u.dll是我最近遇到的一个技术问题&#xff0c;虽然在解决过程中遇到了一些困难&#xff0c;但最终的成功修复让我对技术的力量有了更深的体会。 首先&#xff0c;我想谈谈遇到问题时的困惑。当我尝试运行一个应用程序时&#xff0c;突然弹出一个错误提示&#xff0c;…

YAMLException: java.nio.charset.MalformedInputException: Input length = 1

springboot项目启动的时候提示这个错误&#xff1a;YAMLException: java.nio.charset.MalformedInputException: Input length 1 根据异常信息提示&#xff0c;是YAML文件有问题。 原因是yml配置文件的编码有问题。 需要修改项目的编码格式&#xff0c;一般统一为UTF-8。 或…