使用AWS Glue与AWS Kinesis构建的流式ETL作业(二)——数据处理

大纲

  • 2 数据处理
    • 2.1 架构
    • 2.2 AWS Glue连接和创建
      • 2.2.1 创建AWS RedShift连接
      • 2.2.2 创建AWS RDS连接(以PG为例)
    • 2.3 创建AWS Glue Job
    • 2.4 编写脚本
        • 2.4.1 以AWS RedShift为例
        • 2.4.2 以PG为例
    • 2.5 运行脚本

2 数据处理

2.1 架构

在这里插入图片描述

2.2 AWS Glue连接和创建

下文中提供了AWS RedShift和PG数据库的连接创建过程,在实际使用中我们可以二选一。

2.2.1 创建AWS RedShift连接

前置条件:需要有一个AWS RedShift。
AWS RedShift具体的创建过程本文不表。需要注意的是:RedShift需要创建一个终端节点,具体的方法请看《Glue连接RedShift的前置条件:创建终端节点》
由于Glue Job 在运行的时候,是在独立的服务器上,因此不能直接访问到私有子网中的服务。于是借助Glue连接,可以使得Job在运行时连接AWS服务。

步骤图例
1、创建连接在这里插入图片描述
2、输入连接名称,连接类型选择“Amazon Redshift”在这里插入图片描述
3、选择RedShift集群,输入配置的用户名密码在这里插入图片描述
4、审核在这里插入图片描述
5、测试连接在这里插入图片描述

2.2.2 创建AWS RDS连接(以PG为例)

步骤图例
1、入口在这里插入图片描述
2 、输入连接名称,连接类型选择“JDBC”在这里插入图片描述
3、输入连接PG的JDBC,语法请看使用连接,输入用户名密码,VPC和子网需要选择PG的VPC和子网。并且至少一个选定的安全组必须为所有 TCP 端口指定自引用入站规则在这里插入图片描述
4、检查无误后完成在这里插入图片描述

2.3 创建AWS Glue Job

步骤图例
1、入口在这里插入图片描述
2、若要在Job中引入其他python包,请在安全配置里面添加作业参数:–additional-python-modules:SQLAlchemy== 1.3.16,psycopg2-binary==2.8.5(值请自定义)在这里插入图片描述
3在这里插入图片描述

2.4 编写脚本

2.4.1 以AWS RedShift为例
import sys
import datetime
import boto3
import base64
from pyspark.sql import DataFrame, Row
from pyspark.context import SparkContext
from pyspark.sql.types import *
from pyspark.sql.functions import *
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue import *import json
import datetime
import timeargs = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME'])sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
sourceData = glueContext.create_data_frame.from_catalog( \database = "【你的数据库名称】", \ table_name = "【表的名称】", \transformation_ctx = "datasource0", \additional_options = {"startingPosition": "TRIM_HORIZON", "inferSchema": "true"})LINCAN_SHARE_PAGE = "lincan_share_page"share_page_dot = [("url","string","url","string"),("ip","string","ip","string"),("country","string","country","string"),("city","string","city","string"),("user_id","string","user_id","string"),  ("dot_type","string","dot_type","string"), ("header","string","header","string"),  ("header_appname","string","header_appname","string"),  ("header_ismobile","string","header_ismobile","string"),  ("header_appversion","string","header_appversion","string"),  ("header_useragent","string","header_useragent","string"),("content","string","content","string"),  ("content_url","string","content_url","string"),  ("content_index","string","content_index","string"),  ("content_title","string","content_title","string"),  ("create_time","string","create_time","timestamp")]def get_connect(table):connection_options = {"url": "jdbc:redshift://【RedShift终端节点】:5439/dev","user": "【RedShift用户名】",'database': '【数据库】', "password": "【密码】","dbtable":table,"redshiftTmpDir":  args["TempDir"]} return connection_optionsclass Handle:def __init__(self,dynamic_frame):self._time_now = datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")self.dynamic_frame = dynamic_framedef run(self):raiseclass SharePageDot(Handle):def run(self):print("start share_page dot")def handle(rec):message = eval(rec["logEvents.val.message"])message["header_appname"] = message["header"]["appName"]message["header_ismobile"] = message["header"]["isMobile"]message["header_appversion"] = message["header"]["appVersion"]message["header_useragent"] = message["header"]["userAgent"]message["header"] = json.dumps(message["header"])message["content_url"] = message["content"]["url"]message["content_index"] = message["content"]["index"]message["content_title"] = message["content"]["title"]message["content"] = json.dumps(message["content"])message["create_time"] = datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")# message["create_time"] = time.time()return messageself.dynamic_frame.printSchema()mapped_dyF =  Map.apply(frame = self.dynamic_frame, f = handle)mapped_dyF.printSchema()if not mapped_dyF:returnapplymapping0 = ApplyMapping.apply(frame = mapped_dyF, mappings = share_page_dot, transformation_ctx = "applymapping0")datasink1 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = applymapping0, catalog_connection = "redshift", connection_options = get_connect("share_page_dot"),redshift_tmp_dir= args["TempDir"], transformation_ctx="datasink1")print("end share_page dot")def processBatch(data_frame, batchId):if (data_frame.count() > 0):print("start")logEvents = DynamicFrame.fromDF(data_frame, glueContext, "from_data_frame").select_fields("logEvents")dyf_relationize  = logEvents.relationalize("logEvents",args["TempDir"]+"/relationalize")dyf_selectFromCollection = SelectFromCollection.apply(dyf_relationize, 'logEvents_logEvents')# 筛选sac_or_mon_dyF = Filter.apply(frame = dyf_selectFromCollection, f = lambda x: x["logEvents.val.id"]!="")# page dotpage_dot = Filter.apply(frame = sac_or_mon_dyF, f = lambda x: eval(x["logEvents.val.message"])["dot_type"]==LINCAN_PAGE_DOT)# active dotactive_dot = Filter.apply(frame = sac_or_mon_dyF, f = lambda x: eval(x["logEvents.val.message"])["dot_type"]==LINCAN_ACTIVE_DOT)# share page dotshare_page = Filter.apply(frame = sac_or_mon_dyF, f = lambda x: eval(x["logEvents.val.message"])["dot_type"]==LINCAN_SHARE_PAGE)#if page_dot.count()>0 :#    PageDot(page_dot).run()#if active_dot.count()>0:#   ActiveDot(active_dot).run()if share_page.count()>0:SharePageDot(share_page).run()print("end")glueContext.forEachBatch(frame = sourceData, batch_function = processBatch, options = {"windowSize": "100 seconds",  "checkpointLocation": args["TempDir"] + "/checkpoint/"})
job.commit()
2.4.2 以PG为例

说明:在此脚本中,引入了python其他的包。写入PG使用的是sqlalchemy,是为了实现有则更新,无则写入的操作。若无特殊要求,可参考 “2.4.1”

import sys
import datetime
import boto3
import base64
from pyspark.sql import DataFrame, Row
from pyspark.context import SparkContext
from pyspark.sql.types import *
from pyspark.sql.functions import *
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue import *
import boto3
import json
import datetime
import time
import sqlalchemy
from sqlalchemy.ext.automap import automap_base
from sqlalchemy.orm import sessionmaker
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy.dialects import postgresql
import threading
import copy
import datetime
args = getResolvedOptions(sys.argv, ['TempDir', 'JOB_NAME'])sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)sourceData = glueContext.create_data_frame.from_catalog( \database = "【你的数据库名称】", \ table_name = "【表的名称】", \transformation_ctx = "datasource0", \additional_options = {"startingPosition": "TRIM_HORIZON", "inferSchema": "true"})class DBWrite:url = 'postgresql+psycopg2://【user+passsword@PG终端节点】:5432/crowd'base = Nonesession = Noneengine = None@classmethoddef execute(cls):cls.engine = sqlalchemy.create_engine(cls.url)metadata = sqlalchemy.schema.MetaData(bind=cls.engine)metadata.reflect(cls.engine, schema = "public")cls.base = automap_base(metadata = metadata)cls.base.prepare()cls.session = sessionmaker(bind = cls.engine)DBWrite.execute()
def processBatch(data_frame, batchId):if (data_frame.count() > 0):print("start",str(datetime.datetime.now()))session = DBWrite.session()def save_to_pg(row):row = json.loads(row)insert_data = {"create_time": row["create_time"],"update_time": row["update_time"],"valid": True,"crowd_type": row["crowd_type"],"name": row["name"],"support_num": row["support_num"],"target_amount": row["target_amount"],"status": row["status"],"surplus_day": row["surplus_day"],"crowd_category": row["crowd_category"],"current_amount": row["current_amount"],"address": row["address"],"author": row["author"],"image": row["image"],"comment_num": row["comment_num"],"create_data": row["create_time"][:-9],"unique_key":row["unique_key"]}update_data = copy.deepcopy(insert_data)del update_data["create_time"]del update_data["create_data"]Crowd = DBWrite.base.classes.crowd_crowdinsert_stmt = insert(Crowd).values(**insert_data)insert_stmt = insert_stmt.on_conflict_do_update(index_elements = ["unique_key"],set_ = update_data)session.execute(insert_stmt)def handle(rec):message = rec["logEvents.val.message"]index = message.find("|312F14DS|")message = message[index+10:]message = message.replace(": true", ": True").replace(": false", ": False")message = eval(message)return messagelogEvents = DynamicFrame.fromDF(data_frame, glueContext, "from_data_frame").select_fields("logEvents")dyf_relationize = logEvents.relationalize("logEvents", args["TempDir"]+"/relationalize")dyf_selectFromCollection = SelectFromCollection.apply(dyf_relationize, 'logEvents_logEvents')# 筛选sac_or_mon_dyF = Filter.apply(frame=dyf_selectFromCollection, f=lambda x: x["logEvents.val.id"] != "")mapped_dyF = Map.apply(frame=sac_or_mon_dyF, f=handle)mapped_dyF = mapped_dyF.toDF()for info in (mapped_dyF.toJSON().take(mapped_dyF.toJSON().count()+1)):save_to_pg(info)session.commit()session.close()print("end",str(datetime.datetime.now()))glueContext.forEachBatch(frame=sourceData, batch_function=processBatch, options={"windowSize": "100 seconds",  "checkpointLocation": args["TempDir"] + "/checkpoint/"})
job.commit()

2.5 运行脚本

我们创建的是Spark Stream 类型的Job,因此Job会一直运行。定时的从AWS Kinesis Data Stream中获取数据进行微批量处理。
在这里插入图片描述
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/200712.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C++基础 -42- STL库之list链表

list链表的格式(需要定义头文件) list<int> data1(4, 100);list<int> data2(4, 500);list链表的合并接口 举例使用合并接口并且验证 data2.merge(data1);list<int>::iterator ccc;for (ccc data2.begin(); ccc ! data2.end(); ccc){cout << *ccc …

算法学习系列(五):N皇后、数独

目录 引言一、N皇后问题代码实现测试 二、数独问题代码实现测试 引言 这个N皇后问题是很典型的一个递归问题&#xff0c;就是还是要掌握&#xff0c;所谓递归其实就是dfs&#xff0c;一层一层深入下去。数独和N皇后的思路是一样的&#xff0c;只不过一些细节不同而已。 一、N…

TFIDF、BM25、编辑距离、倒排索引

TFIDF TF刻画了词语t对某篇文档的重要性&#xff0c;IDF刻画了词语t对整个文档集的重要性

2024年API安全趋势预测

目录 1.API漏洞的渗透性 2.标准框架的局限性 3.防止漏洞 4.不断上升的威胁和战略建议 案例分析 2024年的潜在威胁 驾驭不断演变的API安全格局 在接下来的部分中&#xff0c;我们将更深入地研究这些趋势&#xff0c;探索标准框架在应对这些新出现的威胁方面的局限性…

数据库学习日常案例20231203-Mysql高级 -- 日志管理篇

Mysql高级 -- 日志篇 *日志类型 1.mysql的6类日志&#xff1a; 2.日志的弊端 *慢查询日志(slow query log) *通用查询日志(general query log) 1.作用&#xff1a; 2.问题场景&#xff1a; 3.查看当前状态 &#xff1a; 4.启动日志&#xff1a; 方式1&#xff1a;永久…

【GO】记一次排查 docker virtual size 过大问题

起因 运维反馈给我&#xff0c;我的容器 docker virtual size 过大&#xff0c;就没听过这个东西。 使用 docker ps -s 会发现比 docker ps 要多点东西&#xff0c;最后一栏的 size。这个 size 是镜像大小加上你写到运行容器里面文件的大小。 问题排查 容器内服务对落盘文件…

云计算在数字营销中的作用是什么?

营销策略和云计算是一个为企业提供多种优势的系统。它使他们能够取得更大的成功&#xff0c;同时提高产量。这样做的原因是&#xff0c;可以从任何位置远程使用云集成工具和应用程序。基本上&#xff0c;该系统增强了存储设备和传播。同时&#xff0c;它减轻了公司 IT 网络的压…

【Kubernetes】可视化UI界面Dashboard

安装和配置k8s可视化UI界面 一、安装Dashboard1.1、上传镜像并解压1.2、安装dashboard组件1.3、修改service1.4、访问dashboard 二、通过Token令牌访问Dashboard2.1、创建clusterrolebinding2.2、获取token2.3、使用token登录 三、通过kubeconfig文件访问Dashboard3.1、创建clu…

21、pytest参数化中标记单独的测试用例

官方实例 # content of test_expectation_xfail import pytestpytest.mark.parametrize("test_input, expected",[("35",8),("24",6),pytest.param("6*9",42,markspytest.mark.xfail)], ) def test_eval(test_input, expected):asser…

深入理解ConcurrentHashMap源码解析

ConcurrentHashMap是Java中一个非常重要的并发集合类&#xff0c;它提供了线程安全的哈希表实现。其初衷是为了优化同步HashMap&#xff0c;减少线程竞争&#xff0c;提高并发访问效率。随着Java的发展&#xff0c;ConcurrentHashMap在1.7和1.8中经历了显著的变化。以下内容将深…

基于ssm vue技术的品牌银饰售卖平台源码和论文737

摘 要 本论文主要是针对品牌银饰售卖而开发进行概述&#xff0c;主要包括对研究的背景和研究现状&#xff0c;以及研究目的等的阐述&#xff0c;也对该系统的各种功能要求&#xff0c;对系统结构&#xff0c;数据库的设计等进行讨论。随着科技与技术的发展&#xff0c;利用计…

Qt篇——QChartView实现鼠标滚轮缩放、鼠标拖拽平移、鼠标双击重置缩放平移、曲线点击显示坐标

话不多说。 第一步&#xff1a;自定义QChartView&#xff0c;直接搬 FirtCurveChartView.h #ifndef FITCURVECHARTVIEW_H #define FITCURVECHARTVIEW_H #include <QtCharts>class FitCurveChartView : public QChartView {Q_OBJECTpublic:FitCurveChartView(QWidget *…

MySQL 8.x 自签证书通过keytool和openssl转成JKS文件

一、写在前面 数据库MySQL 8.0 通过自签命令在datadir下生成了所有的证书文件。由于Java的JDK不支持直接加载PEM格式的证书&#xff0c;所以需要将PEM格式证书转换成Java能够直接加载的JKS格式证书。我们需要将根证书ca.pem转换成JKS格式的根证书truststore.jks&#xff0c;将…

把 Windows 11 装进移动硬盘:Windows 11 To Go

本篇文章聊聊如何制作一个可以“说带走就带走”的 Windows 操作系统&#xff0c;将 Windows11 做成能够放在 U 盘或者移动硬盘里的 WinToGo “绿色软件”。 写在前面 在《开源的全能维护 U 盘工具&#xff1a;Ventoy》这篇文章的最后&#xff0c;我提到了一个关键词 “WinToG…

html之JS

1、JS的引入 <!-- 内嵌 --><!-- <script>alert(4)</script> --><!-- 外引 --><!-- 内嵌和外引同时有的时候&#xff0c;内嵌被覆盖 --><script src"js/index.js" defer></script>//defer 延迟执行 2、js的变量使用…

基于IText7 PDF模板填充?

引入依赖 <dependency><groupId>com.itextpdf</groupId><artifactId>itext7-core</artifactId><version>8.0.1</version><type>pom</type> </dependency> <dependency><groupId>com.itextpdf</gr…

Mybatis XML 配置文件

我们刚开始就有说Mybatis 的开发有两种方式: 1.注释 2.XML 注解和 XML 的方式是可以共存的 我们前面说的都是注释的方式,接下来是XML方式 XML的方式分为三步 : 1.配置数据库(配在 application.yml 里面) 这个跟注释的配置是一样的,username应该都是一样的,password记得写…

1. 小游戏(贪心)

题干&#xff1a; 谷同学很喜欢玩计算机游戏&#xff0c;特别是战略游戏&#xff0c;但是有时他不能尽快找到解所以常常感到很沮丧。现在面临如下问题&#xff1a;他必须在一个中世纪的城堡里设防&#xff0c;城堡里的道路形成一棵无向树。要在结点上安排最少的士兵使得他们可以…

MySql配置主从服务器复制数据库数据

主从服务器MySql版本尽可能一致。如果不一致的情况此文没考虑。 主服务器必须开启二进制日志文件。查看是否开启&#xff1a; SHOW VARIABLES LIKE log_bin; 开启方法&#xff0c;只需要在配置文件添加如下配置&#xff1a; [mysqld] log-binmysql-bin 1、MySql主服务器…

6.Eclipse里下载Subclipse插件

方法一&#xff1a;从Eclipse Marketplace里面下载 具体操作&#xff1a;打开Eclipse --> Help --> Eclipse Marketplace --> 在Find中输入subclipse搜索 --> 找到subclipse点击install 方法二&#xff1a;从Install New Software里下载 具体操作&#xff1a;打开…