ES数据过多,索引拆分

公司企微聊天数据存储在 ES 中,虽然按照企业分储在不同的ES 索引中,但某些常用的企微主体使用量还是很大。4年中一个索引存储数据已经达到46多亿条数据,占用存储3.1tb,
在这里插入图片描述

ES 配置在这里插入图片描述

由于多一个副本,存储得翻倍,成本考虑,所以没有设置副本分片(不建议你们这么做)

索引拆分后,只需要修改插入时的代码逻辑,设置好别名后,查询代码是不需要改动的

进入主题,拆分索引 ,数据按年进行拆分

1.设置索引模板

PUT _template/chat_message_template
{"index_patterns": ["chat-message-*"],"settings": {"number_of_shards": 15,  // 这里和集群的节点对应,需要是节点的整数倍"number_of_replicas": 0,  // 分片副本,生产环境最好设置>=1"refresh_interval": "20s","codec": "best_compression","max_result_window": "100000000"},"mappings": {"properties": {"msg_id": {"type": "keyword"},"msg_time": {"type": "long"},"msg_type": {"type": "keyword"},"text": {"type": "text","analyzer": "ik_max_word","search_analyzer": "ik_smart","term_vector": "with_positions_offsets"},"image": {"type": "text"},.........}}
}

2.根据模板新建索引

PUT chat-message-2021

3.reindex 到新索引

!!重要!!

# reindex 前查看磁盘是否够用,索引切分后,占用磁盘大小比一个索引大了一些
# 尽量多留一些空间给新索引 , 扩容前磁盘占用40%左右是个不错的选择,请提前进行扩容
# 我们遇到了空间不够用的情况,后扩容,虽然说是滚动扩容,客服说任务可能会取消,但扩容后任务还在,所以尽量提前把空间扩容好
# 查看各节点的分片数量及磁盘使用
GET /_cat/nodes?v&h=name,shards,disk.used_percent
# 结果 
name                              disk.used_percent
es-cn-**-data-g4-3             86.12
es-cn-**-data-g4-2             81.75
es-cn-**-data-g4-0             85.94
es-cn-**-data-g4-4             85.73
es-cn-**-data-g4-1             85.67

reindex命令

‌# 限流保护‌:添加?requests_per_second=1000参数避免集群过载‌
# 异步执行,会直接返回 taskId:   wait_for_completion
# 执行完后,修改以下 msg_time 再执行,可以并行迁移每年的数据POST _reindex?requests_per_second=1000&wait_for_completion=false
{"conflicts": "proceed",  // 默认情况下,当发生version conflict的时候,_reindex会被abort。解决方案设置为“proceed”:"source": {"index": "chat-message-0613","size":5000,"query": {"range": {"msg_time": {"gte": 1609430400000,"lt": 1610380800111}}}},"dest": {"index": "chat-message-2021","op_type": "create" // 把op_type设置为create,_reindex API,只在dest index中添加不不存在的doucments。如果相同的documents已经存在,则会报version confilct的错误。}
}

4.查看进度

GET _tasks?detailed=true&actions=*reindex
# 返回结果 
{"nodes" : {"z5VL_HJ2Qn****AMQ" : {"name" : "es-cn-**-data-g4-3","transport_address" : "121.**.114.80:9300","host" : "121.**.114.80","ip" : "121.**.114.80:9300","roles" : ["data","ingest","master","ml","remote_cluster_client","transform"],"attributes" : {"zone_id" : "cn-shanghai-g","ml.machine_memory" : "64887980032","ml.max_open_jobs" : "20","xpack.installed" : "true","zone" : "cn-shanghai-g","transform.node" : "true"},"tasks" : {"z5VL_HJ2Qn****YhoAMQ:6597302" : {"node" : "z5VL_HJ2Qn****YhoAMQ","id" : 6597302,"type" : "transport","action" : "indices:data/write/reindex","status" : {"total" : 484234,"updated" : 0,"created" : 0,"deleted" : 0,"batches" : 11,"version_conflicts" : 55000,"noops" : 0,"retries" : {"bulk" : 0,"search" : 0},"throttled_millis" : 99999,"requests_per_second" : 1000.0,"throttled_until_millis" : 2410},"description" : "reindex from [chat-message-0613] to [chat-message-2021][_doc]","start_time_in_millis" : 1742189556123,"running_time_in_nanos" : 109710051531,"cancellable" : true,"headers" : {"trace_id" : "ywiWopUBbHoVT9Iz8TeX"}}}}}
}

5.查看文档数量是否相同

把数据和 kibana 中索引文档数据比对即可
例:查看2021年的数据量
GET chat-message-0613/_count
{"query": {"range": {"msg_time": {"gte": 1609459200000,"lt": 1640966400000}}}
}
# 数量对的上的话,可以把老索引给删除了,释放磁盘,在 kibana 中操作即可

6.修改别名

先删除之前的别名,把老的别名放到新建的索引上
POST _aliases
{"actions" : [{"remove" : {"index" : "chat-message-0613" , "alias" : "chat-message"}},{"add" : {"index" : "chat-message-2021" , "alias" : "chat-message"}},{"add" : {"index" : "chat-message-2022" , "alias" : "chat-message"}},{"add" : {"index" : "chat-message-2023" , "alias" : "chat-message"}},{"add" : {"index" : "chat-message-2024" , "alias" : "chat-message"}},{"add" : {"index" : "chat-message-2025" , "alias" : "chat-message"}}]
}
修改别名后,ES 查询部分不用修改

取消reindex命令

POST _tasks/z5VL_HJ2Qn****YhoAMQ:6597302/_cancel

插入文档数据的部分逻辑代码

func (s *ElasticService) CreateIndex(index, alias, mapping string) (err error) {// 判断索引是否存在exists, err := sys.Elastic().IndexExists(index).Do(context.Background())if err != nil {sys.Log().Error(sys.NewProjectErr(1000401).Error() + index)return}if exists {// 加锁,说明此索引已经存在_, _ = model.Factory.Lock.AddLock(model.LockSync, "mapping:ext:"+index, 370*24*3600)return}defer func() {model.Factory.Lock.UnLocK(model.LockSync, "mapping:create:"+index)}()// 多线程创建防止出错,加锁ok, err := model.Factory.Lock.AddLock(model.LockSync, "mapping:create:"+index, 120)if err != nil {return}if !ok {err = errors.New("未抢占到锁")return}// 创建indexcreateIndex, err := sys.Elastic().CreateIndex(index).Body(mapping).Do(context.Background())if err != nil {sys.Log().Error(sys.NewProjectErr(1000401).Error() + index)return}if !createIndex.Acknowledged {// Not acknowledgedsys.Log().Error(sys.NewProjectErr(1000401).Error() + index)return}// 创建aliasputAlias, err := sys.Elastic().Alias().Add(index, alias).Do(context.Background())if err != nil {sys.Log().Error(sys.NewProjectErr(1000402).Error() + alias)return}// 可选:检查别名操作是否被集群确认(按需添加)if !putAlias.Acknowledged {sys.Log().Error("Alias creation not acknowledged: " + alias)return}return
}// 数据按年切分后请求此方法
func (s *ElasticService) Bulk(index, alias string, chatMsg []model.ChatMessage) (bulkResponseItem []*elastic.BulkResponseItem, err error) {tryTime := 0
CreateIndex:// 分布锁ok, err := model.Factory.Lock.ExistLock(model.LockSync, "mapping:ext:"+index)if err != nil {return}if !ok {errCreate := s.CreateIndex(index, alias, ChatMessageMapping)if errCreate != nil {if tryTime < 10 {tryTime++time.Sleep(time.Second)goto CreateIndex} else {err = errCreatereturn}}}bulkRequest := sys.Elastic().Bulk().........}

最后

整个迁移非常费时间。 让grok 帮我算了下(ps. 同样的提问,国内大模型都没算出来 ,还是MASK的强啊)
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/74597.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

存储服务器是指什么

今天小编主要来为大家介绍存储服务器主要是指什么&#xff0c;存储服务器与传统的物理服务器和云服务器是不同的&#xff0c;其是为了特定的目标所设计的&#xff0c;在硬件配置方式上也有着一定的区别&#xff0c;存储空间会根据需求的不同而改变。 存储服务器中一般会配备大容…

golang不使用锁的情况下,对slice执行并发写操作,是否会有并发问题呢?

背景 并发问题最简单的解决方案加个锁,但是,加锁就会有资源争用,提高并发能力其中的一个优化方向就是减少锁的使用。 我在之前的这篇文章《开启多个协程,并行对struct中的每个元素操作,是否会引起并发问题?》中讨论过多协程场景下struct的并发问题。 Go语言中的slice在…

Java知识整理round1

一、常见集合篇 1. 为什么数组索引从0开始呢&#xff1f;假如从1开始不行咩 数组&#xff08;Array&#xff09;&#xff1a;一种用连续的内存空间存储相同数据类型数据的线性数据结构 &#xff08;1&#xff09;在根据数组索引获取元素的时候&#xff0c;会用索引和寻址公式…

【C++指针】搭建起程序与内存深度交互的桥梁(下)

&#x1f525;&#x1f525; 个人主页 点击&#x1f525;&#x1f525; 每文一诗 &#x1f4aa;&#x1f3fc; 往者不可谏&#xff0c;来者犹可追——《论语微子篇》 译文&#xff1a;过去的事情已经无法挽回&#xff0c;未来的岁月还可以迎头赶上。 目录 C内存模型 new与…

JavaScript创建对象的多种方式

在JavaScript中&#xff0c;创建对象有多种方式&#xff0c;每种方式都有其优缺点。本文将介绍四种常见的对象创建模式&#xff1a;工厂模式、构造函数模式、原型模式和组合模式&#xff0c;并分析它们的特点以及如何优化。 1. 工厂模式 工厂模式是一种简单的对象创建方式&am…

muduo库的思路梳理

前言 对于muduo库源码的剖析我发现还是有些混乱的&#xff0c;所以这里再次梳理一下muduo网络库争取可以简单明了 首先对于muduo库来说&#xff0c;不能想的得太过于复杂&#xff0c;它无非就是一个线程池加上epoll组成的网络库 这里我们从用的角度出发理解muoduo网络库 #inc…

Keil5 安装全攻略

Keil5 安装全攻略 Keil5 是一款广泛用于嵌入式开发的 IDE&#xff0c;支持多种微控制器架构&#xff08;如 ARM、C51&#xff09;。本文将详细介绍 Keil5 的安装步骤、常见问题及解决方法&#xff0c;帮助您快速上手。 1. 安装前的准备工作 (1) 系统要求 操作系统&#xff1…

C语言do...while语句将数字反转后输出

一、题目引入 输入一个数字,将各位数字反转后输出? 参考代码: 二、分析代码 接着图片中的分析 第一 ->a 的值变为12 第二 ->进入while循环条件,a为12不等于0循环才停止(a的值为12,显然不等于0) 所以继续进行循环 第三 ->此时b的值为12取各位上的数字(即2) 打印…

优选算法系列(前缀和 _下) k

目录 五&#xff1a;和为 k 的子数组&#xff08;medium&#xff09; 题目链接&#xff1a;560. 和为 K 的子数组 - 力扣&#xff08;LeetCode&#xff09; 解法&#xff1a; 代码&#xff1a; 六&#xff1a;和可被 K 整除的子数组&#xff08;medium&#xff09; 题目链…

mac m3 pro 部署 stable diffusion webui

什么是Stable Diffusion WebUI &#xff1f; Stable Diffusion WebUI 是一个基于Stable Diffusion模型开发的图形用户界面&#xff08;GUI&#xff09;工具。通过这个工具&#xff0c;我们可以很方便的基于提示词&#xff0c;描述一段文本来指导模型生成相应的图像。相比较通过…

OpenCV图像拼接(6)根据权重图对源图像进行归一化处理函数normalizeUsingWeightMap()

操作系统&#xff1a;ubuntu22.04 OpenCV版本&#xff1a;OpenCV4.9 IDE:Visual Studio Code 编程语言&#xff1a;C11 算法描述 cv::detail::normalizeUsingWeightMap 是 OpenCV 中用于图像拼接细节处理的一个函数。它根据权重图对源图像进行归一化处理&#xff0c;通常用于…

23种设计模式-外观(Facade)设计模式

外观设计模式 &#x1f6a9;什么是外观设计模式&#xff1f;&#x1f6a9;外观设计模式的特点&#x1f6a9;外观设计模式的结构&#x1f6a9;外观设计模式的优缺点&#x1f6a9;外观设计模式的Java实现&#x1f6a9;代码总结&#x1f6a9;总结 &#x1f6a9;什么是外观设计模式…

capl语言基础语法(二)

1.strncpy&#xff1a;将字符串复制到另一个字符串中。 输入&#xff1a; dest 是目标字符串。 src 是源字符串。 n 是要复制的最大字符数。 语法&#xff1a; char *strncpy(char *dest, const char *src, size_t n); 例子&#xff1a; strncpy(gStringRep,"",…

QLoRA和LoRA 微调

QLoRA 其实是一种结合了量化和 LoRA 微调技术的统一方法&#xff0c;而不是同时使用两种不同的微调方式。换句话说&#xff0c;QLoRA 的意思就是&#xff1a;先把大模型的主权重用低精度&#xff08;例如 4-bit&#xff09;量化&#xff0c;从而大幅减少存储需求&#xff1b;然…

Qt Concurrent 并发 Map 和 Map-Reduce

并发 Map 和 Map-Reduce QtConcurrent::map()会对容器中的每个项目应用一个函数&#xff0c;对项目进行就地修改。QtConcurrent::mapped() 类似于 map()&#xff0c;但它返回的是一个包含修改内容的新容器。QtConcurrent::mappedReduced() 类似于 mapped()&#xff0c;只不过修…

RT-Thread-线程管理

一、线程管理 RT_Thread线程管理主要是实现线程管理和调度&#xff0c;线程分为用户线程和系统线程。RT_Thread的线程调度器是抢占式的&#xff0c;寻找就绪状态最高优先级线程。 线程管理的API函数 创建线程函数 rt_thread_t rt_thread_create( const char *name, //线程名称 …

【CC2530 教程 十二】CC2530 Z-Stack 硬件抽象层

目录 一、硬件抽象层简介&#xff1a; &#xff08;1&#xff09;HAL 硬件抽象层是什么&#xff1f; &#xff08;2&#xff09;通俗易懂的解释&#xff1a; &#xff08;3&#xff09;具体例子&#xff1a; 二、硬件抽象层HAL&#xff1a; &#xff08;1&#xff09;HAL…

Linux如何判断磁盘是否已分区?

在 Linux 系统中&#xff0c;判断磁盘是否已分区可通过以下方法实现&#xff1a; 方法 1&#xff1a;使用 fdisk -l 命令 此命令会列出所有磁盘及其分区的详细信息&#xff1a; sudo fdisk -l输出解读&#xff1a; 若磁盘&#xff08;如 /dev/sdb&#xff09;下有类似 /dev/…

《熔化焊接与热切割作业》考试注意事项

考试前的准备 携带必要的证件和材料&#xff1a;考生需携带身份证、准考证等有效证件&#xff0c;以及考试所需的焊接工具、材料等。确保证件齐全&#xff0c;避免因证件问题影响考试。 提前检查焊接设备和工具&#xff1a;在考试前&#xff0c;考生应仔细检查焊接设备和工具是…

Matlab Hessian矩阵计算(LoG算子)

文章目录 一、简介二、实现代码三、实现效果参考资料一、简介 图像的Hessian矩阵用于描述图像灰度值的二阶导数,可以用来分析图像的局部曲率和变化。例如,在图像边缘检测、特征点检测等任务中,Hessian矩阵能帮助我们识别图像的结构。 Hessian矩阵定义 对于二维图像,Hessian…