使用 bend-ingest-kafka 将数据流实时导入到 Databend

作者:韩山杰

Databend Cloud 研发工程师

https://github.com/hantmac

Databend是一个开源、高性能、低成本易于扩展的新一代云数据仓库。bend-ingest-kafka 是一个专为 Databend 设计的实时数据导入工具,它允许用户从 Apache Kafka 直接将数据流导入到 Databend 中,实现数据的实时分析和处理。

为什么选择bend-ingest-kafka?

  • 实时性: 能够实时地从 Kafka 中读取数据并导入到 Databend。
  • 高吞吐量: 支持高并发的数据导入,满足大规模数据处理的需求。
  • 易用性: 提供了简单直观的配置方式,便于用户快速上手。
  • 灵活性: 可二次开发支持多种数据格式和自定义转换逻辑。

环境准备

在使用 bend-ingest-kafka 之前,需要确保以下环境已经搭建好:

  • 一个运行中的 Databend 实例或者在 Databend Cloud 中创建一个 warehouse(推荐)。
  • 一个配置好的 Apache Kafka 集群。
  • 已经安装的 bend-ingest-kafka。

快速开始

Step 1: 安装 bend-ingest-kafka

可以从 Databend 的官方 GitHub 仓库 release 页面 下载对应 OS 架构的 bend-ingest-kafka 的可执行二进制文件,或者直接执行命令安装最新版本。

go install  github.com/databendcloud/bend-ingest-kafka@latest

Step 2: 配置 bend-ingest-kafka

配置文件通常包括 Kafka 的连接以及配置信息、Databend 的连接信息以及数据转换的逻辑。以下是一个简单的配置示例:

{"kafkaBootstrapServers": "localhost:9092","kafkaTopic": "ingest_test","KafkaConsumerGroup": "test","mockData": "","isJsonTransform": false,"databendDSN": "https://cloudapp:password@tn3ftqihs--medium-p8at.gw.aws-us-east-2.default.databend.com:443","databendTable": "default.kfk_test","batchSize": 10,"batchMaxInterval": 5,"dataFormat": "json","workers": 1,"copyPurge": false,"copyForce": false,"disableVariantCheck": true,"minBytes": 1024,"maxBytes": 1048576,"maxWait": 10,"useReplaceMode": false,"userStage": "~"
}

具体的配置参数可以参考 Parameter References,这里对几个比较重要的参数展开解释。

  • isJsonTransform: 默认为 true,将 Kafka Json 数据逐字段转换为 Databend 表数据。通过设置 isJsonTransform 为 true 来使用此模式。如果设置为 false 的话,系统将在 Databend 中自动创建一个 raw table, 列包括 (uuid, koffset, kpartition, raw_data, record_metadata, add_time),并将原始数据导入此表。其中 raw_data 为导入的 kafka Json 数据,record_metadata 包含了本条数据的 kafka 元信息 - topicpartitionoffsetcreate_timekey,方便用户查询。
  • useReplaceMode: useReplaceMode 是一种去重模式,开启后如果表中已存在数据,新数据将替换旧数据。但 useReplaceMode 仅在 isJsonTransform 为 false 时支持,因为它需要在目标表中添加 koffset 和 kpartition 字段。在这种模式下,系统可以实现 exactly once 的同步语义,否则为 at-least-once 语义。
  • userStage: 用户的自定义 external stage name。

Step 3: 启动数据导入

这里使用 raw-data 模式作演示。

Kafka 的 Json 数据示例为:
{"i64": 10,"u64": 30,"f64": 20,"s": "hao","s2": "hello","a16":[1],"a8":[2],"d": "2011-03-06","t": "2016-04-04 11:30:00"}
模拟 kafka 生产数据

可以使用下面的脚本快速生成 kafka json 数据:

from confluent_kafka import Producer# 创建一个Producer实例
p = Producer({'bootstrap.servers': 'localhost:9092'})for i in range(1000000):json_data = '{"i64": 10,"u64": 30,"f64": 20,"s": "hao","s2": "hello","a16":[1],"a8":[2],"d": "2011-03-06","t": "2016-04-04 11:30:00"}'p.produce('ingest_test', json_data)print(i)p.flush()
使用配置文件启动 bend-ingest-kafka

默认读取 ./config/conf.json 配置文件,开始将 Kafka 中的数据导入到 Databend。

./bend-ingest-kafka

启动后可以看到 log 和 metrics:

到 Databend 中可以查询到已经同步的数据:

 由于 raw_data 和 record_metadata 的字段格式都是 JSON ,所以可以很灵活地做一些数据分析:

select record_metadata['partition'] p,min(record_metadata['offset']::bigint) o1,max(record_metadata['offset']::bigint) o2,o2-o1+1 sub_count,count(distinct record_metadata['offset']) distinct_cnt,count(1) cnt
from default.kfk_test 
group by p
order by p;

高级特性

  • 错误处理: 能够处理数据导入过程中的异常,并提供重试机制。
  • 监控与日志: 提供详细的日志记录和监控指标,方便跟踪数据导入的状态。

结语

bend-ingest-kafka 作为一个强大的工具,为 Databend 用户提供了从 Kafka 实时导入数据的能力。通过本文的介绍,用户应该能够快速上手并利用这个工具来实现实时数据处理的需求。

关于 Databend

Databend 是一款开源、弹性、低成本,基于对象存储也可以做实时分析的新式数仓。期待您的关注,一起探索云原生数仓解决方案,打造新一代开源 Data Cloud。
👨‍💻‍ Databend Cloud:https://databend.cn

📖 Databend 文档:https://docs.databend.cn/

💻 Wechat:Databend

✨ GitHub:https://github.com/datafuselabs/databend

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/40152.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Konva.js 使用指南

简介 Konva.js 是一个用于创建 2D 图形的高性能 JavaScript 库,专注于提供丰富的 API 和灵活的图层管理。它适用于数据可视化、游戏开发和其他需要复杂图形和动画的应用场景。本文将介绍 Konva.js 的基本使用方法,包括初始化、绘制基本图形、处理事件和…

密码学原理精解【4】

文章目录 Z 256 下的希尔密码 Z_{256}下的希尔密码 Z256​下的希尔密码概述exampleK密钥选择 ∣ K ∣ − 1 |K|^{-1} ∣K∣−1 K ∗ K^* K∗ K − 1 K^{-1} K−1 Z 256 下的希尔密码 Z_{256}下的希尔密码 Z256​下的希尔密码 概述 m ≥ 2 为正整数,表示 m 维向量空…

linux系统中的各种命令的解释和帮助(含内部命令、外部命令)

目录 一、说明 二、命令详解 1、帮助命令的种类 (1)help用法 (2)--help用法 2、如何区别linux内部命令和外部命令 三、help和—help 四、man 命令 1、概述 2、语法和命令格式 (1)man命令的格式&…

Spring Cloud中的服务熔断与降级

Spring Cloud中的服务熔断与降级 大家好,我是免费搭建查券返利机器人省钱赚佣金就用微赚淘客系统3.0的小编,也是冬天不穿秋裤,天冷也要风度的程序猿!今天我们将深入探讨在Spring Cloud中的服务熔断与降级策略。 一、什么是服务熔…

qt6 通过http查询天气的实现

步骤如下: cmakelist 当中,增加如下配置 引入包 访问远端api 解析返回的数据 cmakelist 当中,增加如下配置,作用是引入Network库。 引入包 3、访问远端api void Form1::on_pushButton_clicked() {//根据URL(http://t.weather.…

接口测试流程及测试点!

一、什么时候开展接口测试 1.项目处于开发阶段,前后端联调接口是否请求的通?(对应数据库增删改查)--开发自测 2.有接口需求文档,开发已完成联调(可以转测),功能测试展开之前 3.专…

PHP 面向对象编程(OOP)入门指南

面向对象编程(Object-Oriented Programming,简称OOP)是一种编程范式,通过使用对象来设计和组织代码。PHP作为一种广泛使用的服务器端脚本语言,支持面向对象编程。本文将介绍PHP面向对象编程的基本概念和用法&#xff0…

2.3.2 主程序和外部IO交互 (文件映射方式)----IO Client实现

2.3.2 主程序和外部IO交互 (文件映射方式)----IO Client C实现 和IOServer主要差别: 1 使用Open_Client 连接 2 一定要先打开IOServer,再打开IO_Client 效果显示 1 C 代码实现 1.1 shareddataClient.h 头文件中引用 和sharedd…

手写starter写核心

文章目录 使用cn.smart 不能使用com 避免在yml配置的时候 开启或者 写万能接口实现调整日志级别写了core核心 但是没有引入其他功能组件,就是注解可以使用但是功能没有增,所以core的作用就是写入注解从新写starter 第一步提取注解 写到核心包里面,看其他包 新建模块 使用cn.s…

开源发布Whistle: 基于弱音素监督推进数据高效多语言和跨语言语音识别

论文地址:https://arxiv.org/abs/2406.02166 开源代码及模型: https://github.com/thu-spmi/CAT/blob/master/egs/cv-lang10/readme.md 摘 要 Whistle和Whisper一样,均采用弱监督方式训练ASR基座模型。不同于Whisper采用基于子词&#xff0…

全志T113系列芯片参数|性能|功耗|资料|选型-远众技术

全志T113-i和T113-S3/S4处理器,凭借高性价比、丰富接口、工业级性能等特性,适合不同应用场景。 一、T113-i:强大性能与丰富接口的理想嵌入式处理器 全志T113系列中的T113-i是一款引人注目的高性能、低成本嵌入式处理器,专为各种…

嵌入式面试需要注意的问题!

1.在嵌入式和IT行业,技术更新换代非常快。因此,求职者必须时刻关注行业的最新动向和发展趋势。了解当前市场上哪些技术和岗位需求量大,哪些新兴技术值得学习和掌握,都是至关重要的。 🔸嵌入式行业:嵌入式系…

1、项目基础

1、系统架构图 2、项目业务组成 3、技术选型 3.1 前端 vue3 ts sass axios 3.2后端 spring-cloud系列 gateway openfeign spring-cloud-alibaba系列 nacos sentinel seata

基于矩阵分解算法的评分预测实现---信息检索课设以及所涉及的深度学习原理

一、实验环境 Windows,Python 3 Python作为主要编程语言,使用Python的Pandas、NumPy、Matplotlib等库 二、实验内容 主要任务 查阅相关资料,了解矩阵分解算法的基本概念、应用场景及其难点。重点了解SVD(Singular Value Decomposition,奇异值分解)系列方法。掌握Pyth…

使用Python进行文件批量重命名:轻松实现文件管理

哈喽,大家好,我是木头左! 引言 在日常生活和工作中,经常需要对大量的文件进行重命名。手动一个个地修改文件名不仅耗时耗力,还容易出错。为了解决这个问题,可以利用Python编程语言来实现文件的批量重命名。本文将介绍如何使用Python编写一个简单的脚本,实现对文件进行批…

windows@无密码的本地用户账户相关问题@仅用用户名免密登录远程桌面登录和控制@无密码用户访问共享文件夹以及挂载问题

文章目录 abstract此用户无法登录账户被禁用问题访问共享文件夹时带上凭据错误案例和解决 两类登录方式控制台登录与远程登录的区别为什么限制空密码账户只允许控制台登录相关安全策略如何修改该策略注意事项 启用允许被免密登录功能使用空密码进行远程桌面连接设置远程桌面链接…

硅纪元视角 | 1 分钟搞定 3D 创作,Meta 推出革命性 3D Gen AI 模型

在数字化浪潮的推动下,人工智能(AI)正成为塑造未来的关键力量。硅纪元视角栏目紧跟AI科技的最新发展,捕捉行业动态;提供深入的新闻解读,助您洞悉技术背后的逻辑;汇聚行业专家的见解,…

2.2.2 C#中显示控件BDPictureBox 的实现----DisplayContext说明续

2.2.2 C#中显示控件BDPictureBox 的实现----DisplayContext说明续 1 主要目标描述 实现图片缩放信息和中心点位置偏移信息的管理,外部调用者只要输入放大,缩小,位置偏移,其他全部由displayContext 实现 2 公共的函数部分&#…

第二天:ALOAM前端讲解【第2部分】

三、scan2scan 3. 帧间匹配 特征关联与损失函数计算 (1)线特征 点到线的距离公式: d ϵ = ∣ ( X ~ ( k +

网安小贴士(2)OSI七层模型

一、前言 OSI七层模型是一种网络协议参考模型,用于描述计算机网络体系结构中的不同层次和功能。它由国际标准化组织 (ISO) 在1984年开发并发布。 二、定义 OSI七层模型,全称为开放式系统互联通信参考模型(Open Systems Interconnection Refe…