RisingWave分布式SQL流处理数据库调研

概述

RisingWave是一款分布式SQL流处理数据库,旨在帮助用户降低实时应用的的开发成本。作为专为云上分布式流处理而设计的系统,RisingWave为用户提供了与PostgreSQL类似的使用体验,官方宣称具备比Flink高出10倍的性能(指throughput)以及更低的成本。RisingWave开发只需要关注SQL开发,而不需要像Flink那样去关注

  • RisingWave与Flink不同的是,RisingWave既可以做流处理也可以存储;而Flink只是流处理框架,而不能存储数据,计算后的数据需要存储到外部系统中。官方宣称可以完全替代FlinkSQL。
  • RisingWave与批数据库不同的是,RisingWave可以做流处理,按预定义逻辑实时处理数据,官网宣称可以做到流批一体,批数据库只能处理批数据。

使用场景

RisingWave 的强项是流处理,底层存储为行存,更加适合对已存储的数据高并发点查,而并非全表扫描。RisingWave 的主要使用场景包括了监控、报警、实时动态报表、流式 ETL、机器学习特征工程等。其已经运用到金融交易、制造业、新媒体、物流等领域。
但是,RisingWave 不适合做分析型随机查询。为支持分析型随机查询,用户还需将数据导入到实时分析数据库中进行操作。不少用户将 RisingWave 与 ClickHouse、Apache Doris 等实时分析数据库组合使用:他们使用 RisingWave 做流计算,同时使用实时分析数据库进行分析型随机查询。RisingWave 已经支持到sink ClickHouse、Apache Doris等OLTP中,具体可以参考RisingWave Sink

注意:
RisingWave 不支持读写事务处理,但其支持只读事务。在生产中,使用 RisingWave 的最佳实践是将 RisingWave 放在事务型数据库的下游。RisingWave 通过 CDC 从事务型数据库中读取已经被序列化过的数据。

RisingWave 应用

部署

RisingWave 单机试玩模式

docker run -itd \
-p 4566:4566 \
-p 5691:5691 \
--privileged \
--name=risingwave \
risingwavelabs/risingwave:latest playground

RisingWave 单机 Docker Compose 部署模式(测试推荐这种模式部署,以下测试基于此种模式)

clone the risingwave repository.

git clone https://github.com/risingwavelabs/risingwave.git

进入docker目录

cd docker

启动RisingWave集群

#使用MinIO存储状态后端,standalone模式启动
export RW_IMAGE=risingwavelabs/risingwave:latest
export ENABLE_TELEMETRY=true
docker compose up -d

安装postgresql客户端

由于RisingWave兼容postgresql协议,所以通过postgresql客户端可以直接操作RisingWave
安装postgresql客户端

yum install -y postgresql

使用 psql 连接

psql -h localhost -p 4566 -d dev -U root

启动mysql并开启binlog

  • 启动mysql
# 查看详细默认配置docker run -it --rm mysql:5.7 --verbose --help#启动mysql server
docker run -d \
--name mysql5.7 \
--restart=always \
-p 3306:3306 \
-e MYSQL_ROOT_PASSWORD=123456 \
-v /data/mysql5.7/data:/var/lib/mysql \#数据文件
-v /data/mysql5.7/conf:/etc/mysql/conf.d \#配置文件
-v /data/mysql5.7/log:/var/log \#日志文件
mysql:5.7 \
--character-set-server=utf8mb4 \
--collation-server=utf8mb4_unicode_ci \
--log-bin=/var/lib/mysql/mysql-bin \#开启binlog配置
--server-id=2 #开启binlog配置
  • 链接mysql

docker exec -it mysql5.7 mysql -h127.0.0.1 -P3306 -p’123456’

  • 验证是否开启 binlog

show variables like ‘%log_bin%’;

  • 授权
--授权RisingWave作为slave访问mysql binlog
grant RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT, SELECT on *.* to 'root'@'%' IDENTIFIED BY '123456';
--grant ALL PRIVILEGES on db01.* to 'root'@'%' IDENTIFIED BY '123456';
flush  privileges;
--取消授权,如有需要
REVOKE  GRANT OPTION on *.* FROM 'root'@'%';
REVOKE  ALL PRIVILEGES on *.* FROM 'root'@'%';
REVOKE  ALL PRIVILEGES on db01.* FROM 'root'@'%';
flush  privileges;
--查看授权
show grants for root@'%';

部署kafka

  • 启动kafka
# step-1
docker run -d --name zookeeper -p 2181:2181 wurstmeister/zookeeper:latest
# step-2
# 启动Kafka,将以下的俩个192.168.1.100换为本身的IP地址bash
docker run  -d \
--name kafka \
--restart=always \
-p 8092:8092 \
-e KAFKA_BROKER_ID=1 \
-e KAFKA_ZOOKEEPER_CONNECT=192.168.1.100:2181/kafka \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.1.100:8092 \
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:8092 \
-t wurstmeister/kafka
  • 与kafka交互
#list
docker run -it --rm wurstmeister/kafka kafka-topics.sh --bootstrap-server 192.168.1.100:8092 --list
#create topic
docker run -it --rm wurstmeister/kafka kafka-topics.sh --bootstrap-server 192.168.1.100:8092 --create --replication-factor 1 --partitions 1 --topic test2
#producer
docker run -it --rm wurstmeister/kafka kafka-console-producer.sh --bootstrap-server 192.168.1.100:8092 --topic test1
#consumer
docker run -it --rm wurstmeister/kafka kafka-console-consumer.sh --bootstrap-server 192.168.1.100:8092 --topic test1
  • 或通过kcat与kafka交互
docker pull edenhill/kcat:1.7.1
docker run -it --rm edenhill/kcat:1.7.1 kcat -b 192.168.1.100:8092 -t test_sink_topic -C -J
docker run -it --rm edenhill/kcat:1.7.1 kcat -b 192.168.1.100:8092 -t test_sink_topic -C

RisingWave 使用demo

  1. 数据导出sink demo
-- create table
CREATE TABLE t1 (v1 int, v2 int) 
WITH (connector = 'datagen',fields.v1.kind = 'sequence',fields.v1.start = '1',fields.v2.kind = 'random',fields.v2.min = '-10',fields.v2.max = '10',fields.v2.seed = '1',datagen.rows.per.second = '10') ROW FORMAT JSON;
-- create sink
CREATE SINK test_sink_1
FROM t1 
WITH (properties.bootstrap.server = '192.168.1.100:8092',topic = 'test_sink_topic',connector = 'kafka',primary_key = 'v1'
)
FORMAT UPSERT ENCODE JSON;

查看kafka sink 结果

docker run -it --rm edenhill/kcat:1.7.1 kcat -b 192.168.1.100:8092 -t test_sink_topic -C -J

  1. 连接器 source
--source 连接器
CREATE SOURCE IF NOT EXISTS source_1 (v1 integer,v2 integer,
)
WITH (connector='kafka',topic='test_sink_topic',properties.bootstrap.server='192.168.1.100:8092',scan.startup.mode='latest',
) FORMAT PLAIN ENCODE JSON;
-- table连接器
CREATE TABLE IF NOT EXISTS table_1 (v1 integer,v2 integer,
)
WITH (connector='kafka',topic='test_sink_topic',properties.bootstrap.server='192.168.1.100:8092',scan.startup.mode='latest',
) FORMAT PLAIN ENCODE JSON;
  1. Change Data Capture (CDC) 直连 MySQL CDC
    --mysql ddl:create database db01;use db01;CREATE TABLE orders (order_id int(11) NOT NULL AUTO_INCREMENT,price decimal(11),PRIMARY KEY (order_id));-- risingwave ddlCREATE TABLE orders (order_id int,price decimal,PRIMARY KEY (order_id)) WITH (connector = 'mysql-cdc',hostname = '192.168.1.100',port = '3306',username = 'root',password = '123456',database.name = 'db01',table.name = 'orders',);--mysql dmlinsert into orders(price) values(12),(10),(23);insert into orders(price) values(12),(10);update orders set price=100  where order_id=1;delete from orders where order_id=3;-- risingwave验证数据select * from orders ;
  1. 直接导出物化视图/表数据 (CREATE SINK FROM)
CREATE TABLE t11 (v1 int, v2 int) 
WITH (connector = 'datagen',fields.v1.kind = 'sequence',fields.v1.start = '1',fields.v2.kind = 'random',fields.v2.min = '-10',fields.v2.max = '10',fields.v2.seed = '1',datagen.rows.per.second = '10') ROW FORMAT JSON;create materialized view mv_t11 as select count(*) from t11;CREATE SINK sink1 FROM mv_t11 
WITH (connector='kafka',properties.bootstrap.server='192.168.1.100:8092',topic='t_sink1'
)
FORMAT PLAIN ENCODE JSON(force_append_only='true'
);

check结果

docker run -it --rm edenhill/kcat:1.7.1 kcat -b 192.168.1.100:8092 -t t_sink1 -C -J

  1. 导出 Query 的数据(CREATE SINK AS)
CREATE TABLE t11 (v1 int, v2 int) 
WITH (connector = 'datagen',fields.v1.kind = 'sequence',fields.v1.start = '1',fields.v2.kind = 'random',fields.v2.min = '-10',fields.v2.max = '10',fields.v2.seed = '1',datagen.rows.per.second = '10') ROW FORMAT JSON;CREATE SINK sink2 AS 
SELECT avg(v1) as avg_v1, avg(v2) as avg_v2 
FROM t1
WITH (connector='kafka',properties.bootstrap.server='192.168.1.100:8092',topic='t_sink2'
)
FORMAT PLAIN ENCODE JSON(force_append_only='true'
);

check结果

docker run -it --rm edenhill/kcat:1.7.1 kcat -b 192.168.1.100:8092 -t t_sink1 -C -J

总结

RisingWave 提供与 PostgreSQL 兼容的标准SQL接口。用户可以像使用 PostgreSQL 一样处理数据流。屏蔽了实时处理底层需要遇到的一些技术细节(状态存储,数据一致性,分布式集群扩展等),供应用方快速的开发实时数据流,进行流式ETL。具有以下特性:同步的实时性(可以保证实时的新鲜度,doris等OLAP引擎采用异步实时)、强一致性(doris等OLAP引擎仅提供最终一致性)、高可用、高并发、流处理语义、资源隔离。可以应用在一些数据看版,监控,实时指标等场景。

相关文章

github 仓库
官方文档
中文文档
创始人知乎主页
Slack

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/693542.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

快速清理_卸载docker_找到不用的进程_centos磁盘爆满_清理磁盘---Linux工作笔记071

查看大文件,并且按照大小排名 cd / | du -h |sort -hr|head -30 可以看到根据不用的结果进行删除 可以看到在/data/dict目录很大,里面的都可以删除 然后再去卸载docker,要不然,没有磁盘是卸载不了的 systemctl stop docker systemctl stop docker.socket yum remove docker-…

【2024上半年数学建模推荐】2024年第九届数维杯大学生数学建模挑战赛报名通知

2024上半年数模人必打的数学建模竞赛:数维杯全国大学生数学建模挑战赛已经开始报名。 赛题难度:四颗星 含金量:国家级二类 参赛对象:在校专科、本科、研究生 推荐理由:获奖率高,赛题难度比国赛略微简单…

Qt _day1

1.思维导图 2.设计一个简单登录界面 #include "mywidget.h"MyWidget::MyWidget(QWidget *parent): QWidget(parent) {this->setWindowTitle("原神启动"); // this->setStyleSheet("background-color:rgb(255,184,64)");this->setStyl…

XSS攻击原理与解决方法

参考: web安全之XSS攻击原理及防范_xss攻击原理与解决方法-CSDN博客 跨站脚本攻击(XSS)分类介绍及解决办法_反射型跨站脚本解决方案-CSDN博客 一、概述 XSS攻击是Web攻击中最常见的攻击方法之一,它是通过对网页注入可执行代码且成功地被浏…

ThinkPHP6中使用GatewayWorker

首先是先安装 composer require workerman/gateway-worker composer require workerman/gatewayclient下载demo 服务器开通TCP端口8282、1238 将Applications\YourApp目录随便放ThinkPHP6的哪个位置,我这里放在了app\gateway\ws目录中 配置composer.json "…

【竞技宝】DOTA2-喀山:莫言帕克毁天灭地 IG让一追二力克Neon

北京时间2024年2月21日,喀山未来运动会DOTA2项目在昨天迎来第二个比赛日。本日第二轮第二场比赛由IG对阵Neon。本场比赛两队在前两局各取一胜,决胜局IG的防守反击多次击溃Neon,最终IG让一追二击败Neon。以下是本场比赛的详细战报。 第一局: 首局比赛,IG在天辉方,Neon在夜魇方。…

c++try-catch块的使用和异常处理机制。异常的传播和捕获规则。

ctry-catch块的使用和异常处理机制。 在C中,try-catch块是一种异常处理机制,用于在程序执行期间捕获和处理可能发生的异常。try块用于包含可能抛出异常的代码,而catch块则用于捕获并处理这些异常。 以下是try-catch块的基本用法和异常处理机…

Python自动化部署与配置管理:Ansible与Docker

Ansible 和 Docker 是两种常用于自动化部署和配置管理的工具。Ansible 是一个基于 Python 的自动化运维工具,可以配置管理、应用部署、任务自动化等。而 Docker 是一个开源的应用容器引擎,让开发者可以打包他们的应用以及依赖包到一个可移植的容器中&…

算法项目(2)—— LSTM、RNN、GRU(SE注意力)、卡尔曼轨迹预测

本文包含什么? 项目运行的方式(包教会)项目代码LSTM、RNN、GRU(SE注意力)、卡尔曼四种算法进行轨迹预测.各种效果图运行有问题? csdn上后台随时售后.项目说明 本文实现了三种深度学习算法加传统算法卡尔曼滤波进行轨迹预测, 预测效果图 首先看下不同模型的指标: 模型RM…

unity学习(33)——角色选取界面(原版)

10ARPG网络游戏编程实践(十):角色选择UI及创建面板制作(一)(流畅)_哔哩哔哩_bilibili 角色选择界面教程中是这样的!(这个美工肯定是不能拿出去卖的,但是是有…

IP协议及相关技术协议

一、IP基本认识 1. IP的作用 IP在TCP/IP模型中处于网络层,网络层的主要作用是实现主机与主机之间的通信,而IP的作用是在复杂的网络环境中将数据包发送给最终目的主机。 2. IP与MAC的关系 简单而言,MAC的作用是实现“直连”的两个设备之通信…

77、Spring、Spring Boot和Spring Cloud的关系

77、Spring、Spring Boot和Spring Cloud的关系 随着 Spring、Spring Boot 和 Spring Cloud 的不断发展,越来越多的开发者加入 Spring 的大军中。对于初学者而言,可能不太了解 Spring、Spring Boot 和 Spring Cloud 这些概念以及它们之间的关系&#xff…

[IO复用] Windows IOCP的初步学习

文章目录 前言正文重叠 IO如何理解重叠IO:创建重叠IO重叠IO操作的返回值如何确认IO操作的结果 IOCP比重叠IO多了什么IOCP的流程IOCP和EPOLL的比较 参考 前言 提起IO复用,大部分人首先接触的都是Select、Poll、Epoll,但是在不同的系统中&…

10个行锁、死锁案例⭐️24张加锁分析图彻底搞懂Innodb行锁加锁规则!

10个行锁、死锁案例⭐️24张加锁分析图🚀彻底搞懂Innodb行锁加锁规则! 上篇文章 我们描述原子性与隔离性的实现,其中描述读操作解决隔离性问题的方案时还遗留了一个问题:写操作是如何解决不同的隔离性问题? 本篇文章…

linux CentOs 安装docker 推荐生产环境使用

目录 1. 在CentOs上安装docker所需的系统环境 2. 卸载旧版本 2.1 查看是否已安装docker 2.2 卸载已安装的docker 3. 安装方式 3.1 使用rpm存储库安装(推荐使用该方法) 3.2 从包中安装 4. 开始docker 1. 在CentOs上安装docker所需的系统环境 需要以下CentOS版本之一的维…

数据结构-邻接链表

介绍 邻接矩阵是运用较多的一种储存图的方法,但如果一张网图边数较少,就会出现二维矩阵中大部分数据为0的情况,浪费储存空间 为了避免空间浪费,也可以采用数组与链表结合的方式来存储图 假设有这样一张图 我们可以先用一个数组…

C#中的Async的异常处理

在C#的代码编写中可以通过try/catch来捕获Exception。然而当调用Async方法时需要特别注意 private void Start() {try{TestVoid();}catch (Exception e){Debug.LogException(e);} }private async void TestVoid() {var t Task.Delay(1);await t;throw new Exception("Te…

测试环境搭建整套大数据系统(四:ubuntu22.4创建普通用户)

一:创建用户,修改密码,增加sudo权限。 useradd dolphinscheduler #输入密码 passwd dolphinscheduler # 配置 sudo 免密 sed -i $adolphinscheduler ALL(ALL) NOPASSWD: NOPASSWD: ALL /etc/sudoers sed -i s/Defaults requirett/#Defa…

实现一个python代码编辑器

代码编辑器采用了monacoEditor,一个现成的编辑器。网上有很多文档介绍和开源项目,但是怎么说呢,跟着做,可以实现一个网页编辑器,可以高亮python的语法,但是没有python的提示,找不到可以参考的&a…

ADO.NET和EF框架性能对比

ADO.NET和Entity Framework(EF)框架在性能上有一些不同。总体来说,ADO.NET通常比EF具有更高的性能,特别是在执行大量数据访问操作时。下面是一些关于它们性能差异的要点: 数据库操作方式: ADO.NET&#xf…