使用AWS Glue与AWS Kinesis构建的流式ETL作业(二)——数据处理

大纲

  • 2 数据处理
    • 2.1 架构
    • 2.2 AWS Glue连接和创建
      • 2.2.1 创建AWS RedShift连接
      • 2.2.2 创建AWS RDS连接(以PG为例)
    • 2.3 创建AWS Glue Job
    • 2.4 编写脚本
        • 2.4.1 以AWS RedShift为例
        • 2.4.2 以PG为例
    • 2.5 运行脚本

2 数据处理

2.1 架构

在这里插入图片描述

2.2 AWS Glue连接和创建

下文中提供了AWS RedShift和PG数据库的连接创建过程,在实际使用中我们可以二选一。

2.2.1 创建AWS RedShift连接

前置条件:需要有一个AWS RedShift。
AWS RedShift具体的创建过程本文不表。需要注意的是:RedShift需要创建一个终端节点,具体的方法请看《Glue连接RedShift的前置条件:创建终端节点》
由于Glue Job 在运行的时候,是在独立的服务器上,因此不能直接访问到私有子网中的服务。于是借助Glue连接,可以使得Job在运行时连接AWS服务。

步骤图例
1、创建连接在这里插入图片描述
2、输入连接名称,连接类型选择“Amazon Redshift”在这里插入图片描述
3、选择RedShift集群,输入配置的用户名密码在这里插入图片描述
4、审核在这里插入图片描述
5、测试连接在这里插入图片描述

2.2.2 创建AWS RDS连接(以PG为例)

步骤图例
1、入口在这里插入图片描述
2 、输入连接名称,连接类型选择“JDBC”在这里插入图片描述
3、输入连接PG的JDBC,语法请看使用连接,输入用户名密码,VPC和子网需要选择PG的VPC和子网。并且至少一个选定的安全组必须为所有 TCP 端口指定自引用入站规则在这里插入图片描述
4、检查无误后完成在这里插入图片描述

2.3 创建AWS Glue Job

步骤图例
1、入口在这里插入图片描述
2、若要在Job中引入其他python包,请在安全配置里面添加作业参数:–additional-python-modules:SQLAlchemy== 1.3.16,psycopg2-binary==2.8.5(值请自定义)在这里插入图片描述
3在这里插入图片描述

2.4 编写脚本

2.4.1 以AWS RedShift为例
import sys
import datetime
import boto3
import base64
from pyspark.sql import DataFrame, Row
from pyspark.context import SparkContext
from pyspark.sql.types import *
from pyspark.sql.functions import *
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue import *import json
import datetime
import timeargs = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME'])sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
sourceData = glueContext.create_data_frame.from_catalog( \database = "【你的数据库名称】", \ table_name = "【表的名称】", \transformation_ctx = "datasource0", \additional_options = {"startingPosition": "TRIM_HORIZON", "inferSchema": "true"})LINCAN_SHARE_PAGE = "lincan_share_page"share_page_dot = [("url","string","url","string"),("ip","string","ip","string"),("country","string","country","string"),("city","string","city","string"),("user_id","string","user_id","string"),  ("dot_type","string","dot_type","string"), ("header","string","header","string"),  ("header_appname","string","header_appname","string"),  ("header_ismobile","string","header_ismobile","string"),  ("header_appversion","string","header_appversion","string"),  ("header_useragent","string","header_useragent","string"),("content","string","content","string"),  ("content_url","string","content_url","string"),  ("content_index","string","content_index","string"),  ("content_title","string","content_title","string"),  ("create_time","string","create_time","timestamp")]def get_connect(table):connection_options = {"url": "jdbc:redshift://【RedShift终端节点】:5439/dev","user": "【RedShift用户名】",'database': '【数据库】', "password": "【密码】","dbtable":table,"redshiftTmpDir":  args["TempDir"]} return connection_optionsclass Handle:def __init__(self,dynamic_frame):self._time_now = datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")self.dynamic_frame = dynamic_framedef run(self):raiseclass SharePageDot(Handle):def run(self):print("start share_page dot")def handle(rec):message = eval(rec["logEvents.val.message"])message["header_appname"] = message["header"]["appName"]message["header_ismobile"] = message["header"]["isMobile"]message["header_appversion"] = message["header"]["appVersion"]message["header_useragent"] = message["header"]["userAgent"]message["header"] = json.dumps(message["header"])message["content_url"] = message["content"]["url"]message["content_index"] = message["content"]["index"]message["content_title"] = message["content"]["title"]message["content"] = json.dumps(message["content"])message["create_time"] = datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")# message["create_time"] = time.time()return messageself.dynamic_frame.printSchema()mapped_dyF =  Map.apply(frame = self.dynamic_frame, f = handle)mapped_dyF.printSchema()if not mapped_dyF:returnapplymapping0 = ApplyMapping.apply(frame = mapped_dyF, mappings = share_page_dot, transformation_ctx = "applymapping0")datasink1 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = applymapping0, catalog_connection = "redshift", connection_options = get_connect("share_page_dot"),redshift_tmp_dir= args["TempDir"], transformation_ctx="datasink1")print("end share_page dot")def processBatch(data_frame, batchId):if (data_frame.count() > 0):print("start")logEvents = DynamicFrame.fromDF(data_frame, glueContext, "from_data_frame").select_fields("logEvents")dyf_relationize  = logEvents.relationalize("logEvents",args["TempDir"]+"/relationalize")dyf_selectFromCollection = SelectFromCollection.apply(dyf_relationize, 'logEvents_logEvents')# 筛选sac_or_mon_dyF = Filter.apply(frame = dyf_selectFromCollection, f = lambda x: x["logEvents.val.id"]!="")# page dotpage_dot = Filter.apply(frame = sac_or_mon_dyF, f = lambda x: eval(x["logEvents.val.message"])["dot_type"]==LINCAN_PAGE_DOT)# active dotactive_dot = Filter.apply(frame = sac_or_mon_dyF, f = lambda x: eval(x["logEvents.val.message"])["dot_type"]==LINCAN_ACTIVE_DOT)# share page dotshare_page = Filter.apply(frame = sac_or_mon_dyF, f = lambda x: eval(x["logEvents.val.message"])["dot_type"]==LINCAN_SHARE_PAGE)#if page_dot.count()>0 :#    PageDot(page_dot).run()#if active_dot.count()>0:#   ActiveDot(active_dot).run()if share_page.count()>0:SharePageDot(share_page).run()print("end")glueContext.forEachBatch(frame = sourceData, batch_function = processBatch, options = {"windowSize": "100 seconds",  "checkpointLocation": args["TempDir"] + "/checkpoint/"})
job.commit()
2.4.2 以PG为例

说明:在此脚本中,引入了python其他的包。写入PG使用的是sqlalchemy,是为了实现有则更新,无则写入的操作。若无特殊要求,可参考 “2.4.1”

import sys
import datetime
import boto3
import base64
from pyspark.sql import DataFrame, Row
from pyspark.context import SparkContext
from pyspark.sql.types import *
from pyspark.sql.functions import *
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue import *
import boto3
import json
import datetime
import time
import sqlalchemy
from sqlalchemy.ext.automap import automap_base
from sqlalchemy.orm import sessionmaker
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy.dialects import postgresql
import threading
import copy
import datetime
args = getResolvedOptions(sys.argv, ['TempDir', 'JOB_NAME'])sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)sourceData = glueContext.create_data_frame.from_catalog( \database = "【你的数据库名称】", \ table_name = "【表的名称】", \transformation_ctx = "datasource0", \additional_options = {"startingPosition": "TRIM_HORIZON", "inferSchema": "true"})class DBWrite:url = 'postgresql+psycopg2://【user+passsword@PG终端节点】:5432/crowd'base = Nonesession = Noneengine = None@classmethoddef execute(cls):cls.engine = sqlalchemy.create_engine(cls.url)metadata = sqlalchemy.schema.MetaData(bind=cls.engine)metadata.reflect(cls.engine, schema = "public")cls.base = automap_base(metadata = metadata)cls.base.prepare()cls.session = sessionmaker(bind = cls.engine)DBWrite.execute()
def processBatch(data_frame, batchId):if (data_frame.count() > 0):print("start",str(datetime.datetime.now()))session = DBWrite.session()def save_to_pg(row):row = json.loads(row)insert_data = {"create_time": row["create_time"],"update_time": row["update_time"],"valid": True,"crowd_type": row["crowd_type"],"name": row["name"],"support_num": row["support_num"],"target_amount": row["target_amount"],"status": row["status"],"surplus_day": row["surplus_day"],"crowd_category": row["crowd_category"],"current_amount": row["current_amount"],"address": row["address"],"author": row["author"],"image": row["image"],"comment_num": row["comment_num"],"create_data": row["create_time"][:-9],"unique_key":row["unique_key"]}update_data = copy.deepcopy(insert_data)del update_data["create_time"]del update_data["create_data"]Crowd = DBWrite.base.classes.crowd_crowdinsert_stmt = insert(Crowd).values(**insert_data)insert_stmt = insert_stmt.on_conflict_do_update(index_elements = ["unique_key"],set_ = update_data)session.execute(insert_stmt)def handle(rec):message = rec["logEvents.val.message"]index = message.find("|312F14DS|")message = message[index+10:]message = message.replace(": true", ": True").replace(": false", ": False")message = eval(message)return messagelogEvents = DynamicFrame.fromDF(data_frame, glueContext, "from_data_frame").select_fields("logEvents")dyf_relationize = logEvents.relationalize("logEvents", args["TempDir"]+"/relationalize")dyf_selectFromCollection = SelectFromCollection.apply(dyf_relationize, 'logEvents_logEvents')# 筛选sac_or_mon_dyF = Filter.apply(frame=dyf_selectFromCollection, f=lambda x: x["logEvents.val.id"] != "")mapped_dyF = Map.apply(frame=sac_or_mon_dyF, f=handle)mapped_dyF = mapped_dyF.toDF()for info in (mapped_dyF.toJSON().take(mapped_dyF.toJSON().count()+1)):save_to_pg(info)session.commit()session.close()print("end",str(datetime.datetime.now()))glueContext.forEachBatch(frame=sourceData, batch_function=processBatch, options={"windowSize": "100 seconds",  "checkpointLocation": args["TempDir"] + "/checkpoint/"})
job.commit()

2.5 运行脚本

我们创建的是Spark Stream 类型的Job,因此Job会一直运行。定时的从AWS Kinesis Data Stream中获取数据进行微批量处理。
在这里插入图片描述
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/200712.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C++基础 -42- STL库之list链表

list链表的格式(需要定义头文件) list<int> data1(4, 100);list<int> data2(4, 500);list链表的合并接口 举例使用合并接口并且验证 data2.merge(data1);list<int>::iterator ccc;for (ccc data2.begin(); ccc ! data2.end(); ccc){cout << *ccc …

算法学习系列(五):N皇后、数独

目录 引言一、N皇后问题代码实现测试 二、数独问题代码实现测试 引言 这个N皇后问题是很典型的一个递归问题&#xff0c;就是还是要掌握&#xff0c;所谓递归其实就是dfs&#xff0c;一层一层深入下去。数独和N皇后的思路是一样的&#xff0c;只不过一些细节不同而已。 一、N…

TFIDF、BM25、编辑距离、倒排索引

TFIDF TF刻画了词语t对某篇文档的重要性&#xff0c;IDF刻画了词语t对整个文档集的重要性

2024年API安全趋势预测

目录 1.API漏洞的渗透性 2.标准框架的局限性 3.防止漏洞 4.不断上升的威胁和战略建议 案例分析 2024年的潜在威胁 驾驭不断演变的API安全格局 在接下来的部分中&#xff0c;我们将更深入地研究这些趋势&#xff0c;探索标准框架在应对这些新出现的威胁方面的局限性…

数据库学习日常案例20231203-Mysql高级 -- 日志管理篇

Mysql高级 -- 日志篇 *日志类型 1.mysql的6类日志&#xff1a; 2.日志的弊端 *慢查询日志(slow query log) *通用查询日志(general query log) 1.作用&#xff1a; 2.问题场景&#xff1a; 3.查看当前状态 &#xff1a; 4.启动日志&#xff1a; 方式1&#xff1a;永久…

云计算在数字营销中的作用是什么?

营销策略和云计算是一个为企业提供多种优势的系统。它使他们能够取得更大的成功&#xff0c;同时提高产量。这样做的原因是&#xff0c;可以从任何位置远程使用云集成工具和应用程序。基本上&#xff0c;该系统增强了存储设备和传播。同时&#xff0c;它减轻了公司 IT 网络的压…

【Kubernetes】可视化UI界面Dashboard

安装和配置k8s可视化UI界面 一、安装Dashboard1.1、上传镜像并解压1.2、安装dashboard组件1.3、修改service1.4、访问dashboard 二、通过Token令牌访问Dashboard2.1、创建clusterrolebinding2.2、获取token2.3、使用token登录 三、通过kubeconfig文件访问Dashboard3.1、创建clu…

21、pytest参数化中标记单独的测试用例

官方实例 # content of test_expectation_xfail import pytestpytest.mark.parametrize("test_input, expected",[("35",8),("24",6),pytest.param("6*9",42,markspytest.mark.xfail)], ) def test_eval(test_input, expected):asser…

基于ssm vue技术的品牌银饰售卖平台源码和论文737

摘 要 本论文主要是针对品牌银饰售卖而开发进行概述&#xff0c;主要包括对研究的背景和研究现状&#xff0c;以及研究目的等的阐述&#xff0c;也对该系统的各种功能要求&#xff0c;对系统结构&#xff0c;数据库的设计等进行讨论。随着科技与技术的发展&#xff0c;利用计…

Qt篇——QChartView实现鼠标滚轮缩放、鼠标拖拽平移、鼠标双击重置缩放平移、曲线点击显示坐标

话不多说。 第一步&#xff1a;自定义QChartView&#xff0c;直接搬 FirtCurveChartView.h #ifndef FITCURVECHARTVIEW_H #define FITCURVECHARTVIEW_H #include <QtCharts>class FitCurveChartView : public QChartView {Q_OBJECTpublic:FitCurveChartView(QWidget *…

MySQL 8.x 自签证书通过keytool和openssl转成JKS文件

一、写在前面 数据库MySQL 8.0 通过自签命令在datadir下生成了所有的证书文件。由于Java的JDK不支持直接加载PEM格式的证书&#xff0c;所以需要将PEM格式证书转换成Java能够直接加载的JKS格式证书。我们需要将根证书ca.pem转换成JKS格式的根证书truststore.jks&#xff0c;将…

把 Windows 11 装进移动硬盘:Windows 11 To Go

本篇文章聊聊如何制作一个可以“说带走就带走”的 Windows 操作系统&#xff0c;将 Windows11 做成能够放在 U 盘或者移动硬盘里的 WinToGo “绿色软件”。 写在前面 在《开源的全能维护 U 盘工具&#xff1a;Ventoy》这篇文章的最后&#xff0c;我提到了一个关键词 “WinToG…

Mybatis XML 配置文件

我们刚开始就有说Mybatis 的开发有两种方式: 1.注释 2.XML 注解和 XML 的方式是可以共存的 我们前面说的都是注释的方式,接下来是XML方式 XML的方式分为三步 : 1.配置数据库(配在 application.yml 里面) 这个跟注释的配置是一样的,username应该都是一样的,password记得写…

6.Eclipse里下载Subclipse插件

方法一&#xff1a;从Eclipse Marketplace里面下载 具体操作&#xff1a;打开Eclipse --> Help --> Eclipse Marketplace --> 在Find中输入subclipse搜索 --> 找到subclipse点击install 方法二&#xff1a;从Install New Software里下载 具体操作&#xff1a;打开…

OSPF浅析

一、预习&#xff1a; 1、优点&#xff1a; 是一种典型的链路状态路由协议&#xff0c;协议号89&#xff0c;把大型网络分隔为多个较小、可管理的单元&#xff1a;Area a.减少LSA泛洪范围&#xff0c;有效地把拓朴变化 控制在区域内&#xff0c;达到网络优化的目的…

nodejs微信小程序+python+PHP本科生优秀作业交流网站的设计与实现-计算机毕业设计推荐

通过软件的需求分析已经获得了系统的基本功能需求&#xff0c;根据需求&#xff0c;将本科生优秀作业交流网站功能模块主要分为管理员模块。管理员添加系统首页、个人中心、用户管理、作业分类管理、作业分享管理、论坛交流、投诉举报、系统管理等操作。 随着信息化社会的形成…

【面试HOT200】二叉树的构建二叉搜索树篇

系列综述&#xff1a; &#x1f49e;目的&#xff1a;本系列是个人整理为了秋招面试的&#xff0c;整理期间苛求每个知识点&#xff0c;平衡理解简易度与深入程度。 &#x1f970;来源&#xff1a;材料主要源于【CodeTopHot200】进行的&#xff0c;每个知识点的修正和深入主要参…

Leetcode 77 组合

题意理解&#xff1a; 给定两个整数 n 和 k&#xff0c;返回范围 [1, n] 中所有可能的 k 个数的组合。 如&#xff1a;n3,k2,则有&#xff1a;12 13 23 一般&#xff0c;我们使用回溯法来解决组合问题。 组合问题没有顺序要求&#xff0c;所以 12 21 是同一个组合&#xff08;如…

黑苹果配置清单

手里的MacBookPro已经快沦为电子垃圾了&#xff0c;平时用MacOS比较多&#xff0c;Window用的比较少&#xff0c;而苹果电脑的价格不管是MacBookPro还是MacMini丐版的便宜但是面对现在Window动不动就64g内存的情况就显得微不足道了&#xff0c;高配的价格直接把我劝退&#xff…

PostgreSql HOT 技术

摘自唐成的《PostgreSQL修炼之道&#xff1a;从小工到专家&#xff08;第2版&#xff09;》。 一、概述 因为多版本的原因&#xff0c;当 PostgreSQL 中更新一行时&#xff0c;实际上原数据行并不会被删除&#xff0c;只是插入了一个新行。如果表上有索引&#xff0c;而更新的…