起因:有下载 aws s3 需求,但只有web 登录账号,有 id 用户名 密码,没有 boto3 的 key ID
经过分析,发现网页版有个地址会返回临时 keyID,playwright 模拟登录,用 page.on 监测返回数据,获取 keyID 后再使用 boto3 抓取相关文件,比构造网页请求方便快捷
import os, json, urllib, base64
import time, re
from datetime import datetime
from playwright.sync_api import Playwright, sync_playwright, expect
from bs4 import BeautifulSoup
from functools import wrapsproxy = 'http://username:password@192.192.14.32:3128'
proxies = {'http': proxy,'https': proxy
}# 缓存目录
CACHE_DIR = (r'D:\code\aws_s3\cache')# 确保缓存目录存在
os.makedirs(CACHE_DIR, exist_ok=True)def timethis(func):'''Decorator that reports the execution time:param func::return:'''@wraps(func)def wrapper(*args, **kwargs):start = time.time()s1 = datetime.now()result = func(*args, **kwargs)end = time.time()s2 = datetime.now()func_name = func.__name__consume = end - startconsume2 = s2 - s1print(f'{func_name} consume time is ---> {consume}')print(f'{func_name} consume minutes is ---> {consume2}')return resultreturn wrapperdef handle_route(route):# 获取请求的 URLurl = route.request.urlresource_type = route.request.resource_typeurl = route.request.urlresource_type = route.request.resource_typeblock_list = [# 'telemetry', "browserCreds", 'module-utils.js',# 'svg', 'gif', 'image',# 'module', 'panoramaroute', 'log', 'tele', 'index', 'util', 'css']if any(x in url for x in block_list):# print(f"---: {url} (包含 'dist')")route.abort() # 中止该请求return# print(f"处理请求: {url} ({resource_type})")# 生成对应的缓存文件名# 使用安全的 URL 名称file_name = url.replace("https://", "").replace("http://", "").replace("/", "_").replace(":", "_") + ".json"cache_file = os.path.join(CACHE_DIR, file_name)# 检查缓存文件是否存在if os.path.exists(cache_file):# print(f"从缓存加载: {url}")# 从缓存文件加载数据try:with open(cache_file, 'r') as f:cached_response = json.load(f)# 模拟返回缓存的响应route.fulfill(status=cached_response['status'],headers=cached_response['headers'],body=base64.b64decode(cached_response['body']) # 解码 body)except:passelse:# 继续请求并缓存响应route.continue_()def log_response(response):url = response.urlresource_type = response.request.resource_type# 仅缓存 CSS、JS 和图片文件if resource_type in ['script', 'stylesheet', 'image']:file_name = url.replace("https://", "").replace("http://", "").replace("/", "_").replace(":", "_") + ".json"cache_file = os.path.join(CACHE_DIR, file_name)# 只有在成功状态时才缓存响应if response.status == 200:try:response_body = {'status': response.status,'headers': dict(response.headers),'body': base64.b64encode(response.body()).decode('utf-8') # 确保调用 body() 方法获取字节}# 将响应写入缓存文件with open(cache_file, 'w') as f:json.dump(response_body, f)# print(f"缓存资源: {url}")except Exception as e:# print('cache error', url)pass
requests_info = {}def log_request(request):# 记录请求的开始时间requests_info[request.url] = {'start_time': time.time() # 记录当前时间(开始时间)}def on_response(response, response_data):# 检查响应的 URLif 's3/tb/creds' in response.url and response.status == 200:# 解析响应数据并存储到 response_data 中boto3 = response.json()print('boto3', boto3)response_data.append(response.json())# 使用已保存的状态文件跳过登录状态直接访问系统
@timethis
def get_boto3_token():with sync_playwright() as playwright:browser = playwright.chromium.launch(headless=True,proxy={# 'server': 'http://username:password@192.192.13.193:3128','server': 'http://username:password@192.192.14.32:3128',# 'server': 'http://username:password@10.67.9.200:3128',# 'server': 'http://192.192.163.177:5003',"username": "username","password": "password"})# 创建浏览器上下文时加载状态文件context = browser.new_context()page = context.new_page()should_abort = False# 定义一个列表来存储响应数据response_data = []def handle_route(route):nonlocal should_abort# 检查当前页面是否包含 "open"if should_abort or response_data:print("检测到 'open',停止加载其他内容。")route.abort() # 中止该请求else:route.continue_() # 继续请求# 注册请求拦截事件# page.on("route", handle_route)# 直接访问登录后的URLurl = 'https://us-west-2.console.aws.amazon.com/s3/buckets/bs?prefix=RESPONSE/'# 注册请求和响应事件page.on("response", log_response)# page.on("route", handle_route)page.route("*", handle_route)page.goto(url, timeout=30000 * 3)# 屏蔽这一段就正常了# if page.locator("input[id=\"root_user_radio_button\"]"):# print('find')# page.locator("input[id=\"iam_user_radio_button\"]").click()# page.locator("input[id=\"resolving_input\"]").fill("1111111")# page.locator("button[id=\"next_button\"]").click()if page.locator("input[id=\"account\"]"):print('find')page.locator("input[id=\"account\"]").click()page.locator("input[id=\"account\"]").fill("1111111")# page.locator("button[id=\"next_button\"]").click()print('input username')while True:try:page.locator("input[name=\"username\"]").fill("username")page.locator("input[name=\"password\"]").fill("password")page.locator("#signin_button").click()print('break-->')breakexcept:print(datetime.now(), 'error-->')time.sleep(2)print('wait 6 senconds')time.sleep(2)cookies = page.context.cookies()print('cookie', cookies)url = 'https://us-west-2.console.aws.amazon.com/s3/buckets/bs-tai?region=us-west-2&bucketType=general&prefix=RESPONSE/2023/&showversions=false'# 注册请求和响应事件# 注册响应事件处理函数page.on("response", lambda response: on_response(response, response_data))page.goto(url, timeout=30000 * 3)print('page on response')while True:try:cookies = page.context.cookies()breakexcept:time.sleep(2)print('sleep 2 seconds')soup = BeautifulSoup(page.content(), 'lxml')meta_tag = soup.find('meta', {'name': 'tb-data'})# 提取 content 属性的值tb_data = meta_tag.get('content')# 将 JSON 字符串转换为 Python 字典tb_data_dict = json.loads(tb_data)# 提取 CSRF 令牌xsrf_token = tb_data_dict['csrfToken']print('xsrf token', xsrf_token)print('response_data',response_data)# if not response_data:# get_boto3_token()# else:# print('return boto3 token')# page.close()# browser.close()# playwright.stop()return response_data[0]if __name__ == '__main__':get_boto3_token()pass
boto3_token = get_boto3_token()info = boto3_tokenprint(arrow.now())print('boto3_token-->', type(boto3_token), boto3_token)id = info.get("accessKeyId")key = info.get("secretAccessKey")aws_session_token = info.get("sessionToken")session = Session(aws_access_key_id=id, aws_secret_access_key=key, aws_session_token=aws_session_token)# session = Session(aws_access_key_id=id, aws_secret_access_key=key,aws_session_token=aws_session_token)# 获取s3连接的session##bucket = 'bs-tai'client_s3 = session.client('s3', config=Config(proxies=proxies))s3 = session.resource('s3', config=Config(proxies=proxies)).Bucket('bs-tai')def get_prefix_for_months(months_shift=0):arrow_month = arrow.now().shift(months=months_shift)year = arrow_month.format('YYYY')month = arrow_month.format('MM')return f'conn/RESPONSE/{year}/{month}/'# 获取上一个月和当前月的前缀prefix_last_month = get_prefix_for_months(months_shift=-1)prefix_this_month = get_prefix_for_months(months_shift=0)# 组合前缀到列表prefix_list = [prefix_last_month, prefix_this_month]for prefix in prefix_list:for obj in s3.objects.filter(Prefix=prefix):# print(obj.key)if obj.key.endswith('.csv'):file_path = obj.key# 使用字符串分割来提取年月日parts = file_path.split('/')year = parts[2] # 第四部分是年份month = parts[3] # 第五部分是月份day = parts[4] # 第六部分是日期# print(year, month, day)key = obj.keylocal_filename = key.split('/')[-1]local_file_path = os.path.join(public_share_path, f'{year}{month}{day}', local_filename)if not os.path.exists(local_file_path):local_file_dir = os.path.dirname(local_file_path)os.makedirs(local_file_dir, exist_ok=True)client_s3.download_file(bucket, key, local_file_path)print(f'Downloaded {local_file_path}')read_csv(local_file_path, day=f'{year}{month}{day}')export_result_source(day=f'{year}{month}{day}')
参考
https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html
https://cuiqingcai.com/36045.html
https://www.cnblogs.com/neozheng/p/13563841.html
https://stackoverflow.com/questions/35803027/retrieving-subfolders-names-in-s3-bucket-from-b-boto3
https://stackoverflow.com/questions/35803027/retrieving-subfolders-names-in-s3-bucket-from-b-boto3
https://stackoverflow.com/questions/29378763/how-to-save-s3-object-to-a-file-using-boto3