使用 Kafka 和 CDC 将数据从 MongoDB Atlas 流式传输到 SingleStore Kai

SingleStore 提供了变更数据捕获 (CDC) 解决方案,可将数据从 MongoDB 流式传输到 SingleStore Kai。在本文中,我们将了解如何将 Apache Kafka 代理连接到 MongoDB Atlas,然后使用 CDC 解决方案将数据从 MongoDB Atlas 流式传输到 SingleStore Kai。我们还将使用 Metabase 为 SingleStore Kai 创建一个简单的分析仪表板。

介绍

CDC 是一种跟踪数据库或系统中发生的更改的方法。SingleStore 现在提供了与 MongoDB 配合使用的 CDC 解决方案。

为了演示 CDC 解决方案,我们将使用Kafka 代理将数据流式传输到 MongoDB Atlas 集群,然后使用 CDC 管道将数据从 MongoDB Atlas 传播到 SingleStore Kai。我们还将使用 Metabase 创建一个简单的分析仪表板。

图 1 显示了我们系统的高级架构。

高层架构

图 1. 高级架构(来源:SingleStore)。

我们将在以后的文章中重点介绍使用 CDC 解决方案的其他场景。

MongoDB Atlas

我们将在 M0 沙箱中使用 MongoDB Atlas。我们将在Database Access下配置具有atlasAdmin权限的管理员用户。我们将暂时允许从网络访问下的任何地方(IP 地址 0.0.0.0/0)进行访问。我们将记下用户名密码主机

Apache Kafka

我们将配置 Kafka 代理将数据流式传输到MongoDB Atlas中。我们将使用 Jupyter Notebook 来实现此目的。

首先,我们将安装一些库:

!pip install pymongo kafka-python --quiet

接下来,我们将连接到 MongoDB Atlas 和 Kafka 代理:

from kafka import KafkaConsumer
from pymongo import MongoClienttry:client = MongoClient("mongodb+srv://<username>:<password>@<host>/?retryWrites=true&w=majority")db = client.adtechprint("Connected successfully")
except:print("Could not connect")consumer = KafkaConsumer("ad_events",bootstrap_servers = ["public-kafka.memcompute.com:9092"]

我们将用我们之前从 MongoDB Atlas 保存的值替换<username>,<password>和。<host>

最初,我们将 100 条记录加载到 MongoDB Atlas 中,如下所示:

MAX_ITERATIONS = 100for iteration, message in enumerate(consumer, start = 1):if iteration > MAX_ITERATIONS:breaktry:record = message.value.decode("utf-8")user_id, event_name, advertiser, campaign, gender, income, page_url, region, country = map(str.strip, record.split("\t"))events_record = {"user_id": int(user_id),"event_name": event_name,"advertiser": advertiser,"campaign": int(campaign.split()[0]),"gender": gender,"income": income,"page_url": page_url,"region": region,"country": country}db.events.insert_one(events_record)except Exception as e:print(f"Iteration {iteration}: Could not insert data - {str(e)}")

数据应该成功加载,我们应该看到一个名为 的数据库,adtech其中包含一个名为 的集合events。集合中的文档在结构上应类似于以下示例:

_id: ObjectId('64ec906d0e8c0f7bcf72a8ed')
user_id: 3857963415
event_name: "Impression"
advertiser: "Sherwin-Williams"
campaign: 13
gender: "Female"
income: "25k and below",
page_url: "/2013/02/how-to-make-glitter-valentines-heart-boxes.html/"
region: "Michigan"
country: "US"
这些文档代表广告活动事件。该events集合存储 的详细信息advertiser以及campaign有关用户的各种人口统计信息,例如genderincome

SingleStore Kai

上一篇文章介绍了创建免费 SingleStoreDB 云帐户的步骤。我们将使用以下设置:

  • 工作区组名称: CDC 演示组
  • 云提供商: AWS
  • 区域:美国东部 1(弗吉尼亚北部)
  • 工作区名称: cdc-demo
  • 尺码: S-00
  • 设置:
    - SingleStore Kai 选择

一旦工作区可用,我们将记下密码主机该主机可从CDC Demo Group > Overview > Workspaces > cdc-demo > Connect > Connect Directly > SQL IDE > Host获取。稍后我们将需要元数据库的此信息。我们还将通过在CDC 演示组 > 防火墙下配置防火墙来暂时允许从任何地方进行访问。

从左侧导航窗格中,我们选择DEVELOP > SQL Editor来创建adtech数据库link,如下所示:

CREATE DATABASE IF NOT EXISTS adtech;
USE adtech;DROP LINK adtech.link;CREATE LINK adtech.link AS MONGODB
CONFIG '{"mongodb.hosts": "<primary>:27017, <secondary>:27017, <secondary>:27017","collection.include.list": "adtech.*","mongodb.ssl.enabled": "true","mongodb.authsource": "admin","mongodb.members.auto.discover": "false"}'
CREDENTIALS '{"mongodb.user": "<username>","mongodb.password": "<password>"}';CREATE TABLES AS INFER PIPELINE AS LOAD DATA LINK adtech.link '*' FORMAT AVRO;
我们将用我们之前从 MongoDB Atlas 保存的值替换<username>和。<password>我们还需要将<primary><secondary>和的值替换<secondary>为 MongoDB Atlas 中每个值的完整地址。

我们现在将检查是否有任何表,如下所示:

SHOW TABLES;

这应该显示一张名为events

+------------------+
| Tables_in_adtech |
+------------------+
| events           |
+------------------+

我们将检查表的结构:

DESCRIBE events;

输出应如下所示:

+-------+------+------+------+---------+-------+
| Field | Type | Null | Key  | Default | Extra |
+-------+------+------+------+---------+-------+
| _id   | text | NO   | UNI  | NULL    |       |
| _more | JSON | NO   |      | NULL    |       |
+-------+------+------+------+---------+-------+

接下来,我们将检查是否有pipelines

SHOW PIPELINES;

这将显示events当前调用的一个管道Stopped

+---------------------+---------+-----------+
| Pipelines_in_adtech | State   | Scheduled |
+---------------------+---------+-----------+
| events              | Stopped | False     |
+---------------------+---------+-----------+

现在我们将启动events管道:

START ALL PIPELINES;

并且状态应更改为Running

+---------------------+---------+-----------+
| Pipelines_in_adtech | State   | Scheduled |
+---------------------+---------+-----------+
| events              | Running | False     |
+---------------------+---------+-----------+

如果我们现在运行以下命令:

SELECT COUNT(*) FROM events;

它应该返回 100 作为结果:

+----------+
| COUNT(*) |
+----------+
|      100 |
+----------+

我们将检查表中的一行events,如下所示:

SELECT * FROM events LIMIT 1;

输出应类似于以下内容:

+--------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| _id                                  | _more                                                                                                                                                                                                                                                                   |
+--------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| {"$oid": "64ec906d0e8c0f7bcf72a8f7"} | {"_id":{"$oid":"64ec906d0e8c0f7bcf72a8f7"},"advertiser":"Wendys","campaign":13,"country":"US","event_name":"Click","gender":"Female","income":"75k - 99k","page_url":"/2014/05/flamingo-pop-bridal-shower-collab-with.html","region":"New Mexico","user_id":3857963416} |
+--------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

CDC 解决方案已成功连接到 MongoDB Atlas 并将所有 100 条记录复制到 SingleStore Kai。

现在让我们使用 Metabase 创建一个仪表板。

元数据库

上一篇文章描述了如何安装、配置和创建元数据库连接的详细信息。我们将使用前一篇文章中使用的查询的细微变化来创建可视化。

1. 活动总数

SELECT COUNT(*) FROM events;

2. 各地区活动

SELECT _more::country AS `events.country`, COUNT(_more::country) AS 'events.countofevents'
FROM adtech.events AS events
GROUP BY 1;

3. Top 5 广告商活动

SELECT _more::advertiser AS `events.advertiser`, COUNT(*) AS `events.count`
FROM adtech.events AS events
WHERE (_more::advertiser LIKE '%Subway%' OR _more::advertiser LIKE '%McDonals%' OR _more::advertiser LIKE '%Starbucks%' OR _more::advertiser LIKE '%Dollar General%' OR _more::advertiser LIKE '%YUM! Brands%' OR _more::advertiser LIKE '%Dunkin Brands Group%')
GROUP BY 1
ORDER BY `events.count` DESC;

4. 按性别和收入划分的广告访问者

SELECT *
FROM (SELECT *, DENSE_RANK() OVER (ORDER BY xx.z___min_rank) AS z___pivot_row_rank, RANK() OVER (PARTITION BY xx.z__pivot_col_rank ORDER BY xx.z___min_rank) AS z__pivot_col_ordering, CASEWHEN xx.z___min_rank = xx.z___rank THEN 1ELSE 0END AS z__is_highest_ranked_cellFROM (SELECT *, Min(aa.z___rank) OVER (PARTITION BY aa.`events.income`) AS z___min_rankFROM (SELECT *, RANK() OVER (ORDER BY CASEWHEN bb.z__pivot_col_rank = 1 THEN (CASEWHEN bb.`events.count` IS NOT NULL THEN 0ELSE 1END)ELSE 2END, CASEWHEN bb.z__pivot_col_rank = 1 THEN bb.`events.count`ELSE NULLEND DESC, bb.`events.count` DESC, bb.z__pivot_col_rank, bb.`events.income`) AS z___rankFROM (SELECT *, DENSE_RANK() OVER (ORDER BY CASEWHEN ww.`events.gender` IS NULL THEN 1ELSE 0END, ww.`events.gender`) AS z__pivot_col_rankFROM (SELECT _more::gender AS `events.gender`, _more::income AS `events.income`, COUNT(*) AS `events.count`FROM adtech.events AS eventsWHERE (_more::income <> 'unknown' OR _more::income IS NULL)GROUP BY 1, 2) ww) bbWHERE bb.z__pivot_col_rank <= 16384) aa) xx) zz
WHERE (zz.z__pivot_col_rank <= 50 OR zz.z__is_highest_ranked_cell = 1) AND (zz.z___pivot_row_rank <= 500 OR zz.z__pivot_col_ordering = 1)
ORDER BY zz.z___pivot_row_rank;

图 2 显示了 AdTech 仪表板上图表大小和位置的示例。我们将自动刷新选项设置为 1 分钟。

图 2.最终仪表板。

图 2.最终仪表板。

如果我们通过更改 使用 Jupyter Notebook 将更多数据加载到 MongoDB Atlas 中  MAX_ITERATIONS,我们将看到数据传播到 SingleStore Kai 以及 AdTech 仪表板中反映的新数据。

总结

在本文中,我们创建了一个 CDC 管道,以使用 SingleStore Kai 增强 MongoDB Atlas。正如多个基准测试所强调的那样,SingleStore Kai 因其卓越的性能而可用于分析。我们还使用 Metabase 创建了一个快速的可视化仪表板,以帮助我们深入了解我们的广告活动。


作者:Akmal Chaudhri ​

更多技术干货请关注公号【云原生数据库

squids.cn,云数据库RDS,迁移工具DBMotion,云备份DBTwin等数据库生态工具。

irds.cn,多数据库管理平台(私有云)。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/598078.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

IDEA好用插件

CodeGlance Pro 右侧代码小地图 Git Commit Template git提交信息模板 IDE Eval Reset 无限试用IDEA Maven Helper 图形化展示Maven项 One Dark theme 好看的主题 SequenceDiagram 展示方法调用链 Squaretest 生成单元测试 Translation 翻译 Lombok lombok插件…

【开题报告】基于JavaWeb的年货销售系统的设计与实现

1.选题背景 年货销售是中国传统文化的一部分&#xff0c;也是中国人过年必备的习俗之一。随着互联网的发展&#xff0c;越来越多的人选择在网上购买年货&#xff0c;以节省时间和精力。为了满足人们对年货的购买需求&#xff0c;设计一个基于JavaWeb的年货销售系统具有重要意义…

leecode | 829连续整数求和

给一个整数n 求连续整数的和等于n 的个数 这道题 是一个数论的思想 解决思路&#xff1a; 数必须是连续的&#xff0c;可以转化成一个通用的公式&#xff0c;以101为例做一般性推导&#xff0c;&#xff1a; 101 &#xff1d; 101 &#xff1d; 50 &#xff0b; 51 &#xff1d…

AQS原来是这么设计的,泰裤辣!

缘起 每门编程语言基本都离不开并发问题&#xff0c;Java亦如此。谈到Java的并发就离不开Doug lea老爷子贡献的juc包&#xff0c;而AQS又是juc里面的佼佼者 因此今天就一起来聊聊AQS 概念 AQS是什么&#xff0c;这里借用官方的话 Provides a framework for implementing blo…

web3: 智能合约

目录 智能合约的历史什么是智能合约如何运作?智能合约的应用代币标准ERC-20什么是 ERC-20?功能ERC-20 代币接口ERC-721什么是 ERC-721?功能ERC-721 代币接口:ERC-165ERC-777什么是 ERC-777&

第11课 实现桌面与摄像头叠加

在上一节&#xff0c;我们实现了桌面捕获功能&#xff0c;并成功把桌面图像和麦克风声音发送给对方。在实际应用中&#xff0c;有时候会需要把桌面与摄像头图像叠加在一起发送&#xff0c;这节课我们就来看下如何实现这一功能。 1.备份与修改 备份demo10并修改demo10为demo11…

前端跨域问题的解决思路

目录 前言 跨域问题的解决思路 一般跨域的解决方案 前言 做了一个简单页面&#xff0c;做了一些数据埋点&#xff0c;想通过企业微信机器人来推送数据&#xff0c;遇到了一些问题&#xff0c;顺便记录下。 跨域问题的解决思路 由于是项目比较简单&#xff0c;直接使用了aj…

Sentinel整合OpenFeign

1、配置文件 feign:sentinel:enabled: true 2、 编写一个工厂类 import com.cart.cartservice.client.ItemClient; import com.cart.cartservice.entity.Item; import lombok.extern.slf4j.Slf4j; import org.springframework.cloud.openfeign.FallbackFactory; import org.sp…

《Effective C++》《Resource Management》

文章目录 13、term13:Use objects to manage resources14、term14:Think carefully about copying behavior in resource-managing classes15、term15:Provide access to raw resources in resource-managing classes法一&#xff1a; 使用智能指针的get进行显示转换法二&#…

Redis 连接 命令

目录 1.Redis Echo 命令 - 打印字符串简介语法可用版本: > 1.0.0返回值: 返回字符串本身。 示例 2.Redis Select 命令 - 切换到指定的数据库简介语法可用版本: > 1.0.0返回值: 总是返回 OK 。 示例 3.Redis Ping 命令 - 查看服务是否运行简介语法可用版本: > 1.0.0返回…

2024-2030年中国磁化率仪行业应用前景与投资价值评估分析报告

2024-2030年中国磁化率仪行业应用前景与投资价值评估分析报告 &&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&& 《报告编号》: BG471760 《出…

Apache 网页优化

目录 1.网页压缩与缓存 1.1 网页压缩 1. gzip 介绍 2. Http的压缩过程 3. Apache的压缩模块 4. mod_deflate模块 1.2 网页缓存 1. 配置 mod_expires 模块启用 2. 隐藏版本信息 2.1 配置Apache隐藏版本信息 2.2 Apache 防盗链 1. 配置防盗链 2.检查是否安装mod_re…

景联文科技GPT教育题库:AI教育大模型的强大数据引擎

GPT-4发布后&#xff0c;美国奥数队总教练、卡耐基梅隆大学数学系教授罗博认为&#xff0c;这个几乎是用“刷题”方式喂大的AI教育大模型的到来&#xff0c;意味着人类的刷题时代即将退出历史舞台。 未来教育将更加注重学生的个性化需求和多元化发展&#xff0c;借助GPT和AI教育…

你想过在 C++ 中使用契约和反射特性吗?

以下内容为本人的学习笔记&#xff0c;如需要转载&#xff0c;请声明原文链接微信公众号「ENG八戒」https://mp.weixin.qq.com/s/fOEG22dQqKSpsZmk8z6w6g ISO/IEC C 技术委员会主持人 Herb Sutter 报告称&#xff0c;C26 将具有新的功能&#xff0c;包括契约和反射。 该委员会…

Java异常简单介绍

文章目录 1. 异常分类和关键字1.1 分类1.2 关键字 2. Error2.1 Error定义2.2 常见的Error2.2.1 VirtualMachineError2.2.2 ThreadDeath2.2.3 LinkageError2.2.4 AssertionError2.2.5 InternalError2.2.6 OutOfMemoryError2.2.6.1 OOM原因2.2.6.2 OutOfMemoryError会导致宕机吗 …

大创项目推荐 深度学习卫星遥感图像检测与识别 -opencv python 目标检测

文章目录 0 前言1 课题背景2 实现效果3 Yolov5算法4 数据处理和训练5 最后 0 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 &#x1f6a9; **深度学习卫星遥感图像检测与识别 ** 该项目较为新颖&#xff0c;适合作为竞赛课题方向&#xff0c;学长非常推荐…

LeetCode刷题--- 解码方法

个人主页&#xff1a;元清加油_【C】,【C语言】,【数据结构与算法】-CSDN博客 个人专栏 力扣递归算法题 http://t.csdnimg.cn/yUl2I 【C】 ​​​​​​http://t.csdnimg.cn/6AbpV 数据结构与算法 ​​​http://t.csdnimg.cn/hKh2l 前言&#xff1a;这个专栏主要讲述动…

Spring中的依赖注入(DI)的几种方法的使用

文章目录 Spring中的依赖注入&#xff08;DI&#xff09;的几种方法的使用1、构造函数注入业务层实现类application.xml 2、set方法注入业务层实现类application.xml 3、自动注入业务层实现类application.xml 4、注入集合类型的属性业务层实现类application.xml 测试类 Spring中…

ARM CCA机密计算架构软件栈之Realm资源管理

领域资源管理的基本原则是主机保持控制。这意味着主机决定使用哪个物理内存来支持给定的领域中间物理地址&#xff08;IPA&#xff09;&#xff0c;或者存储RMM使用的Realm元数据的给定片段。 主机始终可以重新获取此物理内存&#xff0c;而无需得到领域的同意。同样&#xff…

JavaScript基本使用方法

JavaScript 是一种用于在网页上添加交互性和动态功能的脚本语言。下面是 JavaScript 的基本使用方法&#xff1a; 1.内嵌方式&#xff1a; • 在 HTML 文件中使用 <script> 标签来嵌入 JavaScript 代码。 • 可以在 <script> 标签内部编写 JavaScript 代码&#…