Python 全栈系列255 UCS实践:按ID同步数据

说明

这是一个常见的使用场景,实测下来效果良好。

内容

1 实验场景

将库中所有的数据取出,送到队列

本质上,这是一种单向不返回的模式。除了在遍历全库有用,在进行回测时也是一样的,时间就是单向不返回的。

通过UCS,将任意离散的数据记录归并到了一个更大的单位下。按照brick、block、part、shard四个层级,使得数据的管理兼顾到人的记忆特性,以及程序批量处理的效率。一个brick通常代表一万条数据,之后以千不断进位。到part级别已经是十亿的容量了。

UCS将所有数据的编号分为三类:

  • 1 数值类。从0开始编号,每条记录递增,这个就是mysql的自增id。
  • 2 时间类。以小时为brick,天为block,月为part, 年为shard。
  • 3 字符类。所有为数值、非时间类的主键采用字符编号。一般采用md5码计算32位字符,然后根据 每8个字符之和对10取余。如果数据很大,也可以考虑对100,甚至1000取余。

UCS规范已经嵌在GFGoLite服务中,通过UCS对象进行快速实现。

以下是本次实验的文件

  • 1 首先要声明worker的缓存空间名称,一般只需一次,后续其他的worker也可以使用这个空间。
  • 2 worker并不是服务状态的,所以每次启动必须要载入元数据,在结束本次执行后,要保存元数据
  • 3 worker的功能是从clickhouse中取数,然后存到stream中
  • 4 QM方面,通过声明远端服务器的RedisAgent完成
  • 5 从GlobalBuffer中获取clickhouse的连接参数
  • 6 使用CHClient来进行实际的控制
  • 7 执行前,待执行的brick_list应该被更新后放在缓存内。
  • 8 执行时,worker先取出待执行的brick_list和已经执行的brick(last_brick)
  • 9 如果last_brick是空,说明这是初始状态,cur_brick为brick_list中的第一个
  • 10 其他情况,cur_brick始终可以取到下一个,直到结尾(此时cur_brick始终等于last_brick),worker会跳过执行
  • 11 在正常执行时,worker通过ucs知道当前数据主键的范围,所以可以根据这个条件取出对应brick的原始数据
  • 12 执行结束时,worker将cur_brick更新到last_brick中。

最终,没执行一次脚本,就会搬运一个brick到远端队列。

'''
UCS顺序Worker的概念Worker采用UCS的顺序编号:id编号、时间编号Worker依赖Buffer提供运行时参数:- 1 brick列表
- 2 上一次处理brick
'''# 1 创建变量空间(Once) worker.general (TroubleShooting ts_001)
# 2 读取需要处理的brick_list(Manually)from Basefuncs import * worker_buffer_space = 'sp_worker.general'
tier1 = 'xxx'
tier2 = 'ucs_brick_ordered.sniffer'
prefix = '.'.join([worker_buffer_space,tier1,tier2]) +'.'target_redis_agent_host = 'http://IP:24118/'
target_redis_connection_hash = None 
target_stream_name = 'xxxx'
target_stream_max_len = 10000000qm = QManager(redis_agent_host = target_redis_agent_host, redis_connection_hash = target_redis_connection_hash, q_max_len = target_stream_max_len)
# ==========================  Load 
gb = GlobalBuffer()
# manually + brick_list
# gb.setx(prefix +'brick_list',brick_list,persist=True)
brick_list=  gb.getx(prefix +'brick_list')
last_brick_handled = gb.getx(prefix +'last_brick_handled')  or ''
last_runtime = gb.getx(prefix +'last_runtime')# brick_list需要保证顺序
if last_brick_handled is None:current_brick =  brick_list[0]
else:if brick_list.index(last_brick_handled) ==  len(brick_list) -1:current_brick = last_brick_handledelse:current_brick = brick_list[brick_list.index(last_brick_handled) +1]print('current_brick', current_brick)if current_brick != last_brick_handled:
# 根据buffer知道要处理的数据ucs = UCS()current_brick_bounds = ucs.get_brick_bounds(current_brick)# ==========================  Processingclick_para = gb.getx('sp_global.buffer.local.container.clickhouse.my_database.para')chc = CHClient(**click_para)# 根据bounds获取数据query_sql = 'select a, b, c, d from xxx where id >= %s and id < %s' % (current_brick_bounds[0], current_brick_bounds[1] )brick_data = chc._exe_sql(query_sql)brick_data_df = pd.DataFrame(brick_data, columns = ['a','b','c','d'])brick_data_df.columns = ['id','task_for','before','after']brick_data_df['function_type'] = 'ucs_worker'brick_data_df['rec_id'] = brick_data_df['id']brick_data_listofdict = brick_data_df.to_dict(orient='records')# ==========================  Postcur_q_len = qm.stream_len(target_stream_name)cur_write_resp = qm.parrallel_write_msg(target_stream_name, brick_data_listofdict, time_out=180)# ==========================  Updateif cur_write_resp['status']:last_brick_handled = current_brickgb.setx(prefix +'last_brick_handled', last_brick_handled, persist =True)print('current batch ', len(brick_data_listofdict),' 、target stream len',qm.stream_len(target_stream_name))
else:last_brick_handled = current_bricklast_runtime = get_time_str1()
gb.setx(prefix +'last_runtime', last_runtime)

flask_celery

后来我用了python的标准logging包 + RotateLog的方式记录,不过以下脚本仍然有用。

执行脚本

对于非标准的程序执行,通过脚本方式放在本地的home文件夹下,由celery调度。
注意,被celery执行的脚本,里面最好都写上绝对路径,因为在使用celery worker执行时,当前路径会默认为服务的启动路径 /opt/flask_celery。
例如LOG_FILE,只写tem.log,那么就会在flask_celery下发生修改。
始终注意的是,由flask celery执行的应该是简单的流转任务,而不是复杂的计算任务。如果有,就应该放在某个容器里执行。
再考虑到执行环境,flask celery是在base环境启动的,对应的包应该都能用。如果要执行特别的任务,就要在脚本里指定环境的切换。

vim /home/test_exe.sh

#!/bin/bash
# 日志文件路径
LOG_FILE="/home/tem.log"# 获取当前时间并追加到日志文件
echo "$(date '+%Y-%m-%d %H:%M:%S') - 脚本执行" >> $LOG_FILE# 检查日志文件中的记录条数
LINE_COUNT=$(wc -l < "$LOG_FILE")# 如果记录条数超过10000条,则截断日志文件以保留最新的100条记录
if [ "$LINE_COUNT" -gt 10000 ]; then# 计算需要保留的行数LINES_TO_KEEP=100# 截断日志文件tail -n $LINES_TO_KEEP $LOG_FILE > temp.log && mv temp.log $LOG_FILE
fi

然后将脚本改为可执行
chmod +x /home/test_exe.sh
执行测试


import requests as req param_dict = {'script_path': '/home/test_exe.sh'}resp = req.post('http://127.0.0.1:24104/exe_sh/',json = param_dict )In [5]: !cat tem.log
2024-06-17 14:55:54 - 脚本执行
2024-06-17 14:59:14 - 脚本执行
2024-06-17 15:21:13 - 脚本执行

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/32288.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Acrobat Pro DC 2021:Mac/Win平台上全面高效的PDF编辑器

Acrobat Pro DC 2021是一款在Mac和Windows平台上广受欢迎的PDF编辑器&#xff0c;它凭借其全面的功能和高效的性能&#xff0c;为用户提供了卓越的PDF处理体验。 一、编辑功能全面强大 Acrobat Pro DC 2021允许用户轻松创建、编辑、合并、转换、签署和分享PDF文件。无论是对P…

学习笔记——交通安全分析07

目录 前言 当天学习笔记整理 2交通行为、心理与安全 3道路交通事故数据 4信控交叉口交通安全分析 结束语 前言 #随着上一轮SPSS学习完成之后&#xff0c;本人又开始了新教材《交通安全分析》的学习 #整理过程不易&#xff0c;喜欢UP就点个免费的关注趴 #本期内容接上一…

一个电商创业者眼中的618:平台大变局

战役结束了&#xff0c;战斗还在继续。 一位朋友去年5月创业&#xff0c;网上卖咖啡&#xff0c;这个赛道很拥挤&#xff0c;时机也不好&#xff0c;今年是他参加第一个618。朋友说&#xff0c;今年的目标是锤炼团队&#xff0c;总结方法&#xff0c;以及最重要的——活下去。…

水系统阻力计算

所谓水泵的选取计算其实就是估算&#xff08;很多计算公式本身就是估算的&#xff09;&#xff0c;估算分的细致些考虑的内容全面些就是精确的计算。 特别补充&#xff1a;当设计流量在设备的额定流量附近时&#xff0c;上面所提到的阻力可以套用&#xff0c;更多的是往往都大…

Spring (72)如何在Spring中使用缓存

在Spring框架中&#xff0c;使用缓存是一种有效的方式来提高应用程序性能&#xff0c;减少数据库或者计算密集型操作的负担。Spring提供了一个声明式的缓存抽象&#xff0c;它允许开发者通过注解来简单地将缓存应用到应用程序中。下面我们将深入探讨Spring缓存的使用&#xff0…

【C++题解】1713 - 输出满足条件的整数3

问题&#xff1a;1713 - 输出满足条件的整数3 类型&#xff1a;简单循环 题目描述&#xff1a; 有一个数列&#xff0c;该数列的前 4 个数是&#xff1a; 1 4 7 10 &#xff1b; 请从键盘读入一个正整数 n &#xff0c;请通过观察前 4 项的规律&#xff0c;输出 1∼n 之间所有…

矩阵计算王牌软件 MATLAB,如何应对 AI 研发应用新挑战

提到 MATLAB&#xff0c;可能是所有控制工程与科学研究工作人员在学生时代就开始接触的软件。在 CSDN 社区 MATLAB 已经积累了 130 多万篇的文章与文档资料&#xff0c;影响了一代又一代的软件开发工程师。MATLAB 背后的 MathWorks 公司是一家已经成立四十年的老牌软件厂商&…

AudioSep:从音频中分离出特定声音(人声、笑声、噪音、乐器等)本地一键整合包下载

AudioSep是一种 AI 模型&#xff0c;可以使用自然语言查询进行声音分离。这一创新性的模型由Audio-AGI开发&#xff0c;使用户能够通过简单的语言描述来分离各种声音源。 比如在嘈杂的人流车流中说话的录音中&#xff0c;可以分别提取干净的人声说话声音和嘈杂的人流车流噪声。…

4种典型家庭教育方式,无论开始是哪一种,都会过渡到最后一种

家庭教育&#xff0c;是孩子教育的一个重要组成部分&#xff0c;事实上是对孩子影响最大的一种教育方式&#xff0c;绝大部分家庭教育都是由孩子的父母来完成的。 家庭教育的特点 家庭教育具有很明显的启蒙性、长期性、全面性。 1.启蒙性。我们的孩子对外部世界的认识和了解&am…

咖啡事故,上海Manner咖啡店,1天两起店员和顾客发生冲突

上海咖啡店Manner&#xff0c;一天的时间竟然发生两起店员和员工发生肢体冲突&#xff1a; 事情详情&#xff1a; Manner威海路716店事件: 店员泼顾客咖啡粉&#xff0c;随后被辞退品牌方回应媒体&#xff0c;表示将严肃处理Manner梅花路门店事件:顾客因等待时间长抱怨&…

解锁PDF处理新境界:轻松调整字体,让你的文档焕然一新!

数字化时代&#xff0c;PDF文件已经成为我们日常办公和学习中不可或缺的一部分。它们为我们提供了方便的阅读体验&#xff0c;同时也保证了文档内容的完整性和格式的统一性。然而&#xff0c;有时候我们可能会遇到一个问题&#xff1a;如何轻松调整PDF文件中的字体&#xff0c;…

Linux内核学习——linux内核体系结构(1)

1 Linux内核模式 学习的是Linux 0.11内核&#xff0c;采用的是单内核模式。单内核模式的主要优点是内核代码结构紧凑、执行速度快&#xff0c;但是层次结构性不强。 操作系统如何提供的服务流程&#xff1f; 应用主程序使用指定的参数值执行系统调用指令(int x80)&#xff0…

如何恢复 Mac 数据?适用于 Mac 的免费磁盘恢复软件

对于大多数 Mac 电脑用户来说&#xff0c;丢失数据是他们最不想遇到的噩梦之一。然而&#xff0c;无论我们多么小心地使用 Mac&#xff0c;多么有条理地存储重要文件&#xff0c;我们仍然有可能丢失 Mac 上的数据。某些硬件故障更有可能导致您意外丢失文件。除此之外&#xff0…

Linux htop命令使用

文章目录 简介界面介绍第一行第二行第三行第四行 如何使用 简介 htop 是一个类似于 top 的命令&#xff0c;但具有更丰富的功能和更友好的界面。它可以实时显示系统中各个进程的资源占用情况&#xff0c;如 CPU 使用率、内存使用率等。以下是对 htop 命令的完全解析&#xff1…

echarts Y轴展示时间片段,series data数据 也是时间片段,鼠标放上去 提示框显示对应的时间片段

功能要求 1、折线图&#xff0c;展示每天对应的一个时间片段 2、echarts Y轴展示时间片段&#xff0c;如&#xff1a;[00:00,03:00,05:15] 3、X轴展示日期&#xff0c;如&#xff1a;[xx年xx月xx日] 后端返回的数据结构&#xff0c;如 [{xAdate:"2024-06-15",data:…

【c++11 之智能指针2 unique、shared、weak *_ptr 原理及案例】及四种智能指针对比分析

C11引入了智能指针&#xff08;Smart Pointers&#xff09;的概念&#xff0c;它们是一种自动管理内存的生命周期的指针类型&#xff0c;帮助开发者避免内存泄漏和野指针等问题。C11标准库中定义了三种智能指针&#xff1a;std::unique_ptr、std::shared_ptr和std::weak_ptr。 …

异步开发的终极答案—协程

我们在之前的文章中讲过,在并发场景下,传统的基于多线程的命令式开发模型虽然比较简单,但并发数高了之后资源占用较高,大量线程会阻塞;而响应式编程模式我们可以通过异步化处理提升系统资源的利用效率,但异步开发有违人的直觉,门槛比较高。作为成年人,我们肯定希望全都…

Linux系统及常用命令介绍

一.介绍 Linux一套免费使用和自由传播的类Unix操作系统&#xff0c;是一个遵循POSIX的多用户、多任务、支持多线程和多CPU的操作系统。Linux系统的说明可以自行百度&#xff0c;知道这几点即可&#xff1a; 1.Linux中一切都是文件&#xff1b; 2.Linux是一款免费操作系统&…

【CT】LeetCode手撕—42. 接雨水

目录 题目1- 思路2- 实现⭐42. 接雨水——题解思路 3- ACM实现 题目 原题连接&#xff1a;42. 接雨水 1- 思路 模式识别&#xff1a;求雨水的面积 ——> 不仅是只求一个比当前元素大的元素&#xff0c;还要求面积 单调栈 应用场景&#xff0c;需要找到左边比当前元素大的…

周末总结(2024/06/22)

工作 人际关系核心实践&#xff1a; 要学会随时回应别人的善意&#xff0c;执行时间控制在5分钟以内 坚持每天早会打招呼 遇到接不住的话题时拉低自己&#xff0c;抬高别人(无阴阳气息) 工作上的要点 现状&#xff08;接受破烂现状&#xff0c;改变状态&#xff09; - 这周没…