AWS 消息队列服务 SQS
- 引言
- 什么是 SQS
- SQS 访问策略 Access Policy
- 示例:如何为 DataLake Subscription 配置 SQS
引言
应用系统需要处理海量数据,数据发送方和数据消费方是通过什么方式来无缝集成消费数据的,AWS 提供 SQS 消息队列服务来解决这个无缝通讯问题,使得应用之间解耦,异步,可靠,灵活和可维护性。
什么是 SQS
AWS SQS(Amazon Simple Queue Service)是一种由 Amazon Web Services 提供的完全托管的消息队列服务。它允许分布式应用程序的各个组件之间异步传输消息,而无需丢失消息或需要其他服务始终可用。以下是 AWS SQS 的主要特点和功能:
- 消息队列
发送和接收消息:SQS 允许应用程序组件之间通过消息队列进行通信。发送方将消息发送到队列中,接收方从队列中读取消息。
消息持久性:消息在队列中保存直到它们被接收并删除。即使发送方或接收方出现故障,消息仍然存在。 - 两种队列类型
标准队列:提供几乎无限的吞吐量,允许每秒处理大量消息。标准队列保证消息至少被处理一次,但可能偶尔会发生消息重复或顺序错乱。
FIFO 队列(先入先出):确保消息按照发送的顺序严格处理且每条消息仅处理一次。适用于顺序处理非常重要的场景。 - 消息延迟
SQS 支持对消息的发送进行延迟处理,允许消息在指定时间之后才变为可见。 - 自动扩展
SQS 是一个完全托管的服务,可以根据流量自动扩展,不需要用户自行管理容量。 - 与其他 AWS 服务集成
SQS 可以与许多 AWS 服务无缝集成,比如与 AWS Lambda 配合使用,实现无服务器计算;或与 Amazon S3、EC2 等结合,构建复杂的分布式系统。
分离系统组件:通过将应用程序组件解耦,你可以在系统的各个部分之间创建一个异步的、可靠的通信机制,这有助于提高系统的灵活性和可维护性 - 消息处理
SQS 提供了可调的消息保留时间(1 分钟到 14 天),以及消息可见性超时,用于控制消息在队列中的处理方式。 - 安全性
SQS 支持通过 AWS Identity and Access Management (IAM) 控制对队列的访问权限,并支持加密,确保消息的安全传输和存储。
AWS SQS 非常适用于需要解耦、扩展以及提高系统容错能力的场景,如订单处理系统、任务分配系统等。 - 成本效益:SQS 按需计费,你只需为你使用的消息量和请求付费,无需预付费用或长期承诺。
SQS 访问策略 Access Policy
在 AWS SQS 中,访问策略(Access Policy)用于控制对 SQS 队列的访问权限。访问策略可以定义谁(即哪些用户或服务)可以访问队列,以及他们能够执行的操作。访问策略是基于 JSON 格式的策略文档,通常通过 AWS Identity and Access Management (IAM) 实现。以下是有关 SQS 访问策略的主要概念和示例。
- 策略结构
SQS 访问策略通常包括以下几个部分:
- Version:指定策略语言的版本。
- Id(可选):策略的唯一标识符。
- Statement:一个或多个权限声明,定义允许或拒绝的操作。
每个 Statement 通常包含以下元素: - Effect:指定是允许(Allow)还是拒绝(Deny)访问。
- Action:指定允许或拒绝的具体操作(例如:sqs:SendMessage、sqs:ReceiveMessage)。
- Resource:指定策略应用的资源,即具体的 SQS 队列 ARN。
- Principal:指定可以执行操作的用户、角色或服务。
- Condition(可选):指定进一步限制访问权限的条件。
- 常见操作
sqs:SendMessage:允许用户发送消息到队列。
sqs:ReceiveMessage:允许用户从队列接收消息。
sqs:DeleteMessage:允许用户从队列中删除消息。
sqs:GetQueueAttributes:允许用户获取队列属性。 - 示例策略
以下是一个示例策略,允许某个特定的 AWS 账户 ID(111122223333)向特定的 SQS 队列发送消息。
{"Version": "2012-10-17","Id": "ExamplePolicy","Statement": [{"Effect": "Allow","Principal": {"AWS": "arn:aws:iam::111122223333:root"},"Action": "sqs:SendMessage","Resource": "arn:aws:sqs:us-east-1:444455556666:MyQueue"}]
}
- 条件(Conditions)
条件语句可以用来进一步限定访问权限。例如,您可以指定允许访问的时间段、来源 IP 地址,或者要求使用 HTTPS 来进行请求。
下面是一个包含条件的策略示例,它只允许从指定的 IP 地址范围内访问 SQS 队列:
{"Version": "2012-10-17","Statement": [{"Effect": "Allow","Principal": "*","Action": "sqs:SendMessage","Resource": "arn:aws:sqs:us-east-1:444455556666:MyQueue","Condition": {"IpAddress": {"aws:SourceIp": "192.168.1.0/24"}}}]
}
- 访问控制的最佳实践
最小权限原则:只授予用户或服务所需的最低权限。
使用 IAM Roles:在可能的情况下,使用 IAM 角色代替直接的用户访问,以提高安全性。
定期审核策略:定期检查和更新策略,确保其符合当前的安全要求。
通过访问策略,您可以精细地控制谁可以访问 SQS 队列及他们能够执行的操作,从而保障消息队列的安全性。
示例:如何为 DataLake Subscription 配置 SQS
要为 DataLake Subscription 配置 Amazon SQS,您需要执行以下步骤:
- 创建 SQS 队列:首先,您需要在 AWS 管理控制台或使用 AWS SDK 创建一个 SQS 队列。
- 配置 DataLake 订阅:将 DataLake 订阅配置为将事件发送到您创建的 SQS 队列。
- 设置权限:确保 DataLake 有权限向您的 SQS 队列发送消息。
详细步骤
- 创建 一个 standard SQS 队列
使用 AWS 管理控制台利用 CloudFormation 或 AWS SDK 创建一个新的 SQS 队列。
import boto3# 创建 SQS 客户端
sqs = boto3.client('sqs')# 创建队列
response = sqs.create_queue(QueueName='MyDataLakeQueue',Attributes={'DelaySeconds': '0','MessageRetentionPeriod': '86400'}
)queue_url = response['QueueUrl']
print(f"Queue created: {queue_url}")
- 配置 DataLake 订阅
导航到 DataLake 服务并创建和配置一个订阅,将事件发送到您创建的 SQS 队列。
例如下面创建一个订阅,所有 collection “SMOKE2” 的 Create 事件都会发送到 target 为 arn:aws:sqs:us-east-1:123456789012:MyDataLakeQueue 的 SQS 队列
{"subscription-name": "my-userdoc-subscriptionsqs","filter": {"event-name": ["Object::Create"],"collection-id": ["SMOKE2"]},"targets": [{"type": "sqs","value": "arn:aws:sqs:us-east-1:123456789012:MyDataLakeQueue"}],"schema-version": ["v2"]
}
- 设置权限
确保 DataLake 有权限向您的 SQS 队列发送消息。您需要为 SQS 队列配置适当的访问策略。
例如下面配置只有来自这个 resource “arn:aws:iam::123456789012:role/release-DataLake-EventPublish” 才有权限发送到这个 SQS。
{"Id": "AllowDataLakeEventsAccess","Version": "2012-10-17","Statement": [{"Sid": "AllowDataLakeEventsAccess","Effect": "Allow","Principal": {"AWS": "*"},"Action": "sqs:SendMessage","Resource": "<EndpointARN>","Condition": {"ArnEquals": {"aws:PrincipalArn": ["arn:aws:iam::123456789012:role/release-DataLake-EventPublish"]}}}]
}
将上述策略附加到您的 SQS 队列。您可以在 AWS 管理控制台中编辑队列的访问策略,或者使用 AWS SDK 更新队列的访问策略。
完成这些步骤后,您的 DataLake 订阅将被配置为将事件发送到指定的 SQS 队列。