使用 Kafka 和 CDC 将数据从 MongoDB Atlas 流式传输到 SingleStore Kai

SingleStore 提供了变更数据捕获 (CDC) 解决方案,可将数据从 MongoDB 流式传输到 SingleStore Kai。在本文中,我们将了解如何将 Apache Kafka 代理连接到 MongoDB Atlas,然后使用 CDC 解决方案将数据从 MongoDB Atlas 流式传输到 SingleStore Kai。我们还将使用 Metabase 为 SingleStore Kai 创建一个简单的分析仪表板。

介绍

CDC 是一种跟踪数据库或系统中发生的更改的方法。SingleStore 现在提供了与 MongoDB 配合使用的 CDC 解决方案。

为了演示 CDC 解决方案,我们将使用Kafka 代理将数据流式传输到 MongoDB Atlas 集群,然后使用 CDC 管道将数据从 MongoDB Atlas 传播到 SingleStore Kai。我们还将使用 Metabase 创建一个简单的分析仪表板。

图 1 显示了我们系统的高级架构。

高层架构

图 1. 高级架构(来源:SingleStore)。

我们将在以后的文章中重点介绍使用 CDC 解决方案的其他场景。

MongoDB Atlas

我们将在 M0 沙箱中使用 MongoDB Atlas。我们将在Database Access下配置具有atlasAdmin权限的管理员用户。我们将暂时允许从网络访问下的任何地方(IP 地址 0.0.0.0/0)进行访问。我们将记下用户名密码主机

Apache Kafka

我们将配置 Kafka 代理将数据流式传输到MongoDB Atlas中。我们将使用 Jupyter Notebook 来实现此目的。

首先,我们将安装一些库:

!pip install pymongo kafka-python --quiet

接下来,我们将连接到 MongoDB Atlas 和 Kafka 代理:

from kafka import KafkaConsumer
from pymongo import MongoClienttry:client = MongoClient("mongodb+srv://<username>:<password>@<host>/?retryWrites=true&w=majority")db = client.adtechprint("Connected successfully")
except:print("Could not connect")consumer = KafkaConsumer("ad_events",bootstrap_servers = ["public-kafka.memcompute.com:9092"]

我们将用我们之前从 MongoDB Atlas 保存的值替换<username>,<password>和。<host>

最初,我们将 100 条记录加载到 MongoDB Atlas 中,如下所示:

MAX_ITERATIONS = 100for iteration, message in enumerate(consumer, start = 1):if iteration > MAX_ITERATIONS:breaktry:record = message.value.decode("utf-8")user_id, event_name, advertiser, campaign, gender, income, page_url, region, country = map(str.strip, record.split("\t"))events_record = {"user_id": int(user_id),"event_name": event_name,"advertiser": advertiser,"campaign": int(campaign.split()[0]),"gender": gender,"income": income,"page_url": page_url,"region": region,"country": country}db.events.insert_one(events_record)except Exception as e:print(f"Iteration {iteration}: Could not insert data - {str(e)}")

数据应该成功加载,我们应该看到一个名为 的数据库,adtech其中包含一个名为 的集合events。集合中的文档在结构上应类似于以下示例:

_id: ObjectId('64ec906d0e8c0f7bcf72a8ed')
user_id: 3857963415
event_name: "Impression"
advertiser: "Sherwin-Williams"
campaign: 13
gender: "Female"
income: "25k and below",
page_url: "/2013/02/how-to-make-glitter-valentines-heart-boxes.html/"
region: "Michigan"
country: "US"
这些文档代表广告活动事件。该events集合存储 的详细信息advertiser以及campaign有关用户的各种人口统计信息,例如genderincome

SingleStore Kai

上一篇文章介绍了创建免费 SingleStoreDB 云帐户的步骤。我们将使用以下设置:

  • 工作区组名称: CDC 演示组
  • 云提供商: AWS
  • 区域:美国东部 1(弗吉尼亚北部)
  • 工作区名称: cdc-demo
  • 尺码: S-00
  • 设置:
    - SingleStore Kai 选择

一旦工作区可用,我们将记下密码主机该主机可从CDC Demo Group > Overview > Workspaces > cdc-demo > Connect > Connect Directly > SQL IDE > Host获取。稍后我们将需要元数据库的此信息。我们还将通过在CDC 演示组 > 防火墙下配置防火墙来暂时允许从任何地方进行访问。

从左侧导航窗格中,我们选择DEVELOP > SQL Editor来创建adtech数据库link,如下所示:

CREATE DATABASE IF NOT EXISTS adtech;
USE adtech;DROP LINK adtech.link;CREATE LINK adtech.link AS MONGODB
CONFIG '{"mongodb.hosts": "<primary>:27017, <secondary>:27017, <secondary>:27017","collection.include.list": "adtech.*","mongodb.ssl.enabled": "true","mongodb.authsource": "admin","mongodb.members.auto.discover": "false"}'
CREDENTIALS '{"mongodb.user": "<username>","mongodb.password": "<password>"}';CREATE TABLES AS INFER PIPELINE AS LOAD DATA LINK adtech.link '*' FORMAT AVRO;
我们将用我们之前从 MongoDB Atlas 保存的值替换<username>和。<password>我们还需要将<primary><secondary>和的值替换<secondary>为 MongoDB Atlas 中每个值的完整地址。

我们现在将检查是否有任何表,如下所示:

SHOW TABLES;

这应该显示一张名为events

+------------------+
| Tables_in_adtech |
+------------------+
| events           |
+------------------+

我们将检查表的结构:

DESCRIBE events;

输出应如下所示:

+-------+------+------+------+---------+-------+
| Field | Type | Null | Key  | Default | Extra |
+-------+------+------+------+---------+-------+
| _id   | text | NO   | UNI  | NULL    |       |
| _more | JSON | NO   |      | NULL    |       |
+-------+------+------+------+---------+-------+

接下来,我们将检查是否有pipelines

SHOW PIPELINES;

这将显示events当前调用的一个管道Stopped

+---------------------+---------+-----------+
| Pipelines_in_adtech | State   | Scheduled |
+---------------------+---------+-----------+
| events              | Stopped | False     |
+---------------------+---------+-----------+

现在我们将启动events管道:

START ALL PIPELINES;

并且状态应更改为Running

+---------------------+---------+-----------+
| Pipelines_in_adtech | State   | Scheduled |
+---------------------+---------+-----------+
| events              | Running | False     |
+---------------------+---------+-----------+

如果我们现在运行以下命令:

SELECT COUNT(*) FROM events;

它应该返回 100 作为结果:

+----------+
| COUNT(*) |
+----------+
|      100 |
+----------+

我们将检查表中的一行events,如下所示:

SELECT * FROM events LIMIT 1;

输出应类似于以下内容:

+--------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| _id                                  | _more                                                                                                                                                                                                                                                                   |
+--------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| {"$oid": "64ec906d0e8c0f7bcf72a8f7"} | {"_id":{"$oid":"64ec906d0e8c0f7bcf72a8f7"},"advertiser":"Wendys","campaign":13,"country":"US","event_name":"Click","gender":"Female","income":"75k - 99k","page_url":"/2014/05/flamingo-pop-bridal-shower-collab-with.html","region":"New Mexico","user_id":3857963416} |
+--------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

CDC 解决方案已成功连接到 MongoDB Atlas 并将所有 100 条记录复制到 SingleStore Kai。

现在让我们使用 Metabase 创建一个仪表板。

元数据库

上一篇文章描述了如何安装、配置和创建元数据库连接的详细信息。我们将使用前一篇文章中使用的查询的细微变化来创建可视化。

1. 活动总数

SELECT COUNT(*) FROM events;

2. 各地区活动

SELECT _more::country AS `events.country`, COUNT(_more::country) AS 'events.countofevents'
FROM adtech.events AS events
GROUP BY 1;

3. Top 5 广告商活动

SELECT _more::advertiser AS `events.advertiser`, COUNT(*) AS `events.count`
FROM adtech.events AS events
WHERE (_more::advertiser LIKE '%Subway%' OR _more::advertiser LIKE '%McDonals%' OR _more::advertiser LIKE '%Starbucks%' OR _more::advertiser LIKE '%Dollar General%' OR _more::advertiser LIKE '%YUM! Brands%' OR _more::advertiser LIKE '%Dunkin Brands Group%')
GROUP BY 1
ORDER BY `events.count` DESC;

4. 按性别和收入划分的广告访问者

SELECT *
FROM (SELECT *, DENSE_RANK() OVER (ORDER BY xx.z___min_rank) AS z___pivot_row_rank, RANK() OVER (PARTITION BY xx.z__pivot_col_rank ORDER BY xx.z___min_rank) AS z__pivot_col_ordering, CASEWHEN xx.z___min_rank = xx.z___rank THEN 1ELSE 0END AS z__is_highest_ranked_cellFROM (SELECT *, Min(aa.z___rank) OVER (PARTITION BY aa.`events.income`) AS z___min_rankFROM (SELECT *, RANK() OVER (ORDER BY CASEWHEN bb.z__pivot_col_rank = 1 THEN (CASEWHEN bb.`events.count` IS NOT NULL THEN 0ELSE 1END)ELSE 2END, CASEWHEN bb.z__pivot_col_rank = 1 THEN bb.`events.count`ELSE NULLEND DESC, bb.`events.count` DESC, bb.z__pivot_col_rank, bb.`events.income`) AS z___rankFROM (SELECT *, DENSE_RANK() OVER (ORDER BY CASEWHEN ww.`events.gender` IS NULL THEN 1ELSE 0END, ww.`events.gender`) AS z__pivot_col_rankFROM (SELECT _more::gender AS `events.gender`, _more::income AS `events.income`, COUNT(*) AS `events.count`FROM adtech.events AS eventsWHERE (_more::income <> 'unknown' OR _more::income IS NULL)GROUP BY 1, 2) ww) bbWHERE bb.z__pivot_col_rank <= 16384) aa) xx) zz
WHERE (zz.z__pivot_col_rank <= 50 OR zz.z__is_highest_ranked_cell = 1) AND (zz.z___pivot_row_rank <= 500 OR zz.z__pivot_col_ordering = 1)
ORDER BY zz.z___pivot_row_rank;

图 2 显示了 AdTech 仪表板上图表大小和位置的示例。我们将自动刷新选项设置为 1 分钟。

图 2.最终仪表板。

图 2.最终仪表板。

如果我们通过更改 使用 Jupyter Notebook 将更多数据加载到 MongoDB Atlas 中  MAX_ITERATIONS,我们将看到数据传播到 SingleStore Kai 以及 AdTech 仪表板中反映的新数据。

总结

在本文中,我们创建了一个 CDC 管道,以使用 SingleStore Kai 增强 MongoDB Atlas。正如多个基准测试所强调的那样,SingleStore Kai 因其卓越的性能而可用于分析。我们还使用 Metabase 创建了一个快速的可视化仪表板,以帮助我们深入了解我们的广告活动。


作者:Akmal Chaudhri ​

更多技术干货请关注公号【云原生数据库

squids.cn,云数据库RDS,迁移工具DBMotion,云备份DBTwin等数据库生态工具。

irds.cn,多数据库管理平台(私有云)。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/598078.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

AQS原来是这么设计的,泰裤辣!

缘起 每门编程语言基本都离不开并发问题&#xff0c;Java亦如此。谈到Java的并发就离不开Doug lea老爷子贡献的juc包&#xff0c;而AQS又是juc里面的佼佼者 因此今天就一起来聊聊AQS 概念 AQS是什么&#xff0c;这里借用官方的话 Provides a framework for implementing blo…

第11课 实现桌面与摄像头叠加

在上一节&#xff0c;我们实现了桌面捕获功能&#xff0c;并成功把桌面图像和麦克风声音发送给对方。在实际应用中&#xff0c;有时候会需要把桌面与摄像头图像叠加在一起发送&#xff0c;这节课我们就来看下如何实现这一功能。 1.备份与修改 备份demo10并修改demo10为demo11…

前端跨域问题的解决思路

目录 前言 跨域问题的解决思路 一般跨域的解决方案 前言 做了一个简单页面&#xff0c;做了一些数据埋点&#xff0c;想通过企业微信机器人来推送数据&#xff0c;遇到了一些问题&#xff0c;顺便记录下。 跨域问题的解决思路 由于是项目比较简单&#xff0c;直接使用了aj…

《Effective C++》《Resource Management》

文章目录 13、term13:Use objects to manage resources14、term14:Think carefully about copying behavior in resource-managing classes15、term15:Provide access to raw resources in resource-managing classes法一&#xff1a; 使用智能指针的get进行显示转换法二&#…

Redis 连接 命令

目录 1.Redis Echo 命令 - 打印字符串简介语法可用版本: > 1.0.0返回值: 返回字符串本身。 示例 2.Redis Select 命令 - 切换到指定的数据库简介语法可用版本: > 1.0.0返回值: 总是返回 OK 。 示例 3.Redis Ping 命令 - 查看服务是否运行简介语法可用版本: > 1.0.0返回…

Apache 网页优化

目录 1.网页压缩与缓存 1.1 网页压缩 1. gzip 介绍 2. Http的压缩过程 3. Apache的压缩模块 4. mod_deflate模块 1.2 网页缓存 1. 配置 mod_expires 模块启用 2. 隐藏版本信息 2.1 配置Apache隐藏版本信息 2.2 Apache 防盗链 1. 配置防盗链 2.检查是否安装mod_re…

景联文科技GPT教育题库:AI教育大模型的强大数据引擎

GPT-4发布后&#xff0c;美国奥数队总教练、卡耐基梅隆大学数学系教授罗博认为&#xff0c;这个几乎是用“刷题”方式喂大的AI教育大模型的到来&#xff0c;意味着人类的刷题时代即将退出历史舞台。 未来教育将更加注重学生的个性化需求和多元化发展&#xff0c;借助GPT和AI教育…

你想过在 C++ 中使用契约和反射特性吗?

以下内容为本人的学习笔记&#xff0c;如需要转载&#xff0c;请声明原文链接微信公众号「ENG八戒」https://mp.weixin.qq.com/s/fOEG22dQqKSpsZmk8z6w6g ISO/IEC C 技术委员会主持人 Herb Sutter 报告称&#xff0c;C26 将具有新的功能&#xff0c;包括契约和反射。 该委员会…

Java异常简单介绍

文章目录 1. 异常分类和关键字1.1 分类1.2 关键字 2. Error2.1 Error定义2.2 常见的Error2.2.1 VirtualMachineError2.2.2 ThreadDeath2.2.3 LinkageError2.2.4 AssertionError2.2.5 InternalError2.2.6 OutOfMemoryError2.2.6.1 OOM原因2.2.6.2 OutOfMemoryError会导致宕机吗 …

大创项目推荐 深度学习卫星遥感图像检测与识别 -opencv python 目标检测

文章目录 0 前言1 课题背景2 实现效果3 Yolov5算法4 数据处理和训练5 最后 0 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 &#x1f6a9; **深度学习卫星遥感图像检测与识别 ** 该项目较为新颖&#xff0c;适合作为竞赛课题方向&#xff0c;学长非常推荐…

Leetcode算法系列| 11. 盛最多水的容器

目录 1.题目2.题解C# 解法一&#xff1a;暴力C# 解法二&#xff1a;双指针&#xff08;左指针大于右指针&#xff0c;left&#xff09;C# 解法三&#xff1a;双指针优化&#xff08;左指针小于等于最小高度&#xff0c;left&#xff09;Java 解法一&#xff1a;双指针Python3 解…

经常使用耳机对耳朵听力有影响吗?戴哪种耳机不伤耳朵听力?

经常使用耳机容易引起末梢感受器官受损&#xff0c;可能造成内耳功能损伤&#xff0c;出现耳聋、耳鸣等听力的适应性下降的症状&#xff0c;建议使用耳机时间不要过长&#xff0c;并且音量不要过大。如果想保护听力的话&#xff0c;建议选择骨传导耳机&#xff0c;骨传导耳机通…

单片机快速入门

参考连接&#xff1a; 安装MinGW-64&#xff08;在win10上搭建C/C开发环境&#xff09;https://zhuanlan.zhihu.com/p/85429160MinGW-64; 链接&#xff1a;https://pan.baidu.com/s/1oE1FmjyK7aJPnDC8vASmCg?pwdy1mz 提取码&#xff1a;y1mz --来自百度网盘超级会员V7的分享C…

rotate-captcha-crack项目重新训练百度旋转验证码角度预测模型

参考&#xff1a; building-powerful-image-classification-models-using-very-little-data.html https://github.com/Starry-OvO/rotate-captcha-crack &#xff08;主&#xff09;作者思路&#xff1a;https://www.52pojie.cn/thread-1754224-1-1.html 纠正 新版百度、百家…

低成本总线技术——LIN总线协议规范介绍

关注菲益科公众号—>对话窗口发送 “CANoe ”或“INCA”&#xff0c;即可获得canoe入门到精通电子书和INCA软件安装包&#xff08;不带授权码&#xff09;下载地址。 本篇文章主要介绍LIN总线协议规范。 数据帧的结构 LIN的数据帧包括报头&#xff0c;响应两大部分。而报头…

Visual Studio 2013 “即将退休”

新年快乐&#xff01; 这也是向各位开发者提醒 Visual Studio 支持生命周期中即将到来的好时机。 对 Visual Studio 2013 的支持即将在今年(2024年)的4月9日结束。如果你正在使用旧版本的 Visual Studio&#xff0c;我们强烈建议您升级您的开发环境到最新的 Visual Studio 20…

浏览器是如何渲染页面的

浏览器是个极其复杂的程序&#xff0c;这里只是挑几个和前端息息相关的重要内容来说 在学习如何渲染之前需要知道一个浏览器浏览器会有多个进程&#xff0c;其中主要进程有浏览器进程&#xff0c;网络进程&#xff0c;渲染进程这里我们主要学习内容就发生在渲染进程。当渲染进程…

【解决openGauss无法使用gs_check等服务器端命令问题】

【解决openGauss无法使用gs_check等服务器端命令问题】 一、问题描述二、问题原因三、解决方法 一、问题描述 [ommopengauss03 ~]$ gs_check -i CheckCPU Parsing the check items config file successfully [GAUSS-53026]: ERROR: Execute SSH command on host 192.168.56.19…

给出一句话来描述想要的图片,就能从图库中搜出来符合要求的

介绍 地址&#xff1a;https://github.com/mazzzystar/Queryable The open-source code of Queryable, an iOS app, leverages the OpenAIs CLIP model to conduct offline searches in the Photos album. Unlike the category-based search model built into the iOS Photos…

项目使用PowerJob

新一代的定时任务框架——PowerJob 简介 PowerJob是基于java开发的企业级的分布式任务调度平台&#xff0c;与xxl-job一样&#xff0c;基于web页面实现任务调度配置与记录&#xff0c;使用简单&#xff0c;上手快速&#xff0c;其主要功能特性如下&#xff1a; 使用简单&…