Logstash 迁移索引元数据(设置和映射)

https://help.aliyun.com/zh/es/use-cases/use-logstash-to-migrate-full-or-incremental-data-from-self-managed-elasticsearch-to-alibaba-cloud-elasticsearch

在进行数据迁移时,Logstash会帮助您自动创建索引,但是自动创建的索引可能与您待迁移的索引存在差异,导致迁移前后数据的格式不一致。因此建议您在数据迁移前,在阿里云Elasticsearch中手动创建目标索引,确保迁移前后索引数据完全一致。

您可以通过Python脚本创建目标索引,具体操作步骤如下:

适配 Python 3.10.9

#!/usr/bin/python
# -*- coding: UTF-8 -*-
# 文件名:indiceCreate.py
import base64
import http.client
import json## 源集群host。
oldClusterHost = "localhost:9200"
## 源集群用户名,可为空。
oldClusterUserName = "elastic"
## 源集群密码,可为空。
oldClusterPassword = "xxxxxx"
## 目标集群host,可在阿里云Elasticsearch实例的基本信息页面获取。
newClusterHost = "jiankunking****.elasticsearch.aliyuncs.com:9200"
## 目标集群用户名。
newClusterUser = "elastic"
## 目标集群密码。
newClusterPassword = "xxxxxx"
DEFAULT_REPLICAS = 0def httpRequest(method, host, endpoint, params="", username="", password=""):conn = http.client.HTTPConnection(host)headers = {}if (username != ""):'Hello {name}, your age is {age} !'.format(name='Tom', age='20')up = ('{username}:{password}'.format(username=username, password=password))# print(up)# print(up.encode())# base64string = base64.encodestring(#     up.encode()).replace('\n', '')base64string = base64.b64encode(up.encode()).decode()print(base64string)headers["Authorization"] = "Basic %s" % base64string;if "GET" == method:headers["Content-Type"] = "application/x-www-form-urlencoded"conn.request(method=method, url=endpoint, headers=headers)else:headers["Content-Type"] = "application/json"conn.request(method=method, url=endpoint, body=params, headers=headers)response = conn.getresponse()res = response.read()return resdef httpGet(host, endpoint, username="", password=""):return httpRequest("GET", host, endpoint, "", username, password)def httpPost(host, endpoint, params, username="", password=""):return httpRequest("POST", host, endpoint, params, username, password)def httpPut(host, endpoint, params, username="", password=""):return httpRequest("PUT", host, endpoint, params, username, password)def getIndices(host, username="", password=""):endpoint = "/_cat/indices"indicesResult = httpGet(oldClusterHost, endpoint, oldClusterUserName, oldClusterPassword)indicesList = indicesResult.decode().split("\n")indexList = []for indices in indicesList:if (indices.find("open") > 0):indexList.append(indices.split()[2])return indexListdef getSettings(index, host, username="", password=""):endpoint = "/" + index + "/_settings"indexSettings = httpGet(host, endpoint, username, password)print(index + "  原始settings如下:\n" + indexSettings.decode())settingsDict = json.loads(indexSettings)## 分片数默认和源集群索引保持一致。number_of_shards = settingsDict[index]["settings"]["index"]["number_of_shards"]## 副本数默认为0。number_of_replicas = DEFAULT_REPLICASnewSetting = "\"settings\": {\"number_of_shards\": %s, \"number_of_replicas\": %s}" % (number_of_shards, number_of_replicas)return newSettingdef getMapping(index, host, username="", password=""):endpoint = "/" + index + "/_mapping"indexMapping = httpGet(host, endpoint, username, password)print(index + " 原始mapping如下:\n" + indexMapping.decode())mappingDict = json.loads(indexMapping)mappings = json.dumps(mappingDict[index]["mappings"])newMapping = "\"mappings\" : " + mappingsreturn newMappingdef createIndexStatement(oldIndexName):settingStr = getSettings(oldIndexName, oldClusterHost, oldClusterUserName, oldClusterPassword)mappingStr = getMapping(oldIndexName, oldClusterHost, oldClusterUserName, oldClusterPassword)createstatement = "{\n" + str(settingStr) + ",\n" + str(mappingStr) + "\n}"return createstatementdef createIndex(oldIndexName, newIndexName=""):if (newIndexName == ""):newIndexName = oldIndexNamecreatestatement = createIndexStatement(oldIndexName)print("新索引 " + newIndexName + " 的setting和mapping如下:\n" + createstatement)endpoint = "/" + newIndexNamecreateResult = httpPut(newClusterHost, endpoint, createstatement, newClusterUser, newClusterPassword)print("新索引 " + newIndexName + " 创建结果:" + createResult.decode())## main
indexList = getIndices(oldClusterHost, oldClusterUserName, oldClusterPassword)
systemIndex = []
for index in indexList:if (index.startswith(".")):systemIndex.append(index)else:createIndex(index, index)
if (len(systemIndex) > 0):for index in systemIndex:print(index + " 或许是系统索引,不会重新创建,如有需要,请单独处理~")

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/59097.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Pandas】基础操作

📝本文介绍 本文为作者观看pandas入门课后整理的基础操作笔记 👋作者简介:一个正在积极探索的本科生 📱联系方式:943641266(QQ) 🚪Github地址:https://github.com/sankexilianhua 🔑…

【react使用AES对称加密的实现】

react使用AES对称加密的实现 前言使用CryptoJS库密钥存放加密方法解密方法结语 前言 项目中要求敏感信息怕被抓包泄密必须进行加密传输处理,普通的md5加密虽然能解决传输问题,但是项目中有权限的用户是需要查看数据进行查询的,所以就不能直接…

【STM32】INA3221三通道电压电流采集模块,HAL库

一、简单介绍 芯片的datasheet地址: INA3221 三通道、高侧测量、分流和总线电压监视器,具有兼容 I2C 和 SMBUS 的接口 datasheet (Rev. B) 笔者所使用的INA3221是淘宝买的模块 原理图 模块的三个通道的电压都是一样,都是POWER。这个芯片采用…

《机器人SLAM导航核心技术与实战》第1季:第10章_其他SLAM系统

视频讲解 【第1季】10.第10章_其他SLAM系统-视频讲解 【第1季】10.1.第10章_其他SLAM系统_RTABMAP算法-视频讲解 【第1季】10.2.第10章_其他SLAM系统_VINS算法-视频讲解 【第1季】10.3.第10章_其他SLAM系统_机器学习与SLAM-视频讲解 第1季:第10章_其他SLAM系统 …

《HelloGitHub》第 103 期

兴趣是最好的老师,HelloGitHub 让你对编程感兴趣! 简介 HelloGitHub 分享 GitHub 上有趣、入门级的开源项目。 github.com/521xueweihan/HelloGitHub 这里有实战项目、入门教程、黑科技、开源书籍、大厂开源项目等,涵盖多种编程语言 Python、…

【OJ题解】C++实现反转字符串中的每个单词

💵个人主页: 起名字真南 💵个人专栏:【数据结构初阶】 【C语言】 【C】 【OJ题解】 题目要求:给定一个字符串 s ,你需要反转字符串中每个单词的字符顺序,同时仍保留空格和单词的初始顺序。 题目链接: 反转字符串中的所…

Oracle OCP认证考试考点详解082系列09

题记: 本系列主要讲解Oracle OCP认证考试考点(题目),适用于19C/21C,跟着学OCP考试必过。 41. 第41题: 题目 41.Examine the description of the EMPLOYEES table NLS_DATE_FORMAT is set to DD-MON-YY Which query…

创建线程时传递参数给线程

在C中,可以使用 std::thread 来创建和管理线程,同时可以通过几种方式将参数传递给线程函数。这些方法包括使用值传递、引用传递和指针传递。下面将对这些方法进行详细讲解并给出相应的代码示例。 1. 值传递参数 当你创建线程并希望传递参数时&#xff…

Linux下cgdb/gdb调试以及关于操作系统那些事

目录 一.gdb调试 1.1debug和release版本有什么区别? 1.2性能优化 1.3gdb的使用 1.4cgdb的安装 二.什么是硬件 三.冯诺依曼体系 四.操作系统(OS) 4.1理解操作系统 4.1.1操作系统是什么? 4.1.2为什么要有操作系统? 4.1.3 OS-银行 4.1.4OS如何管理 理解库文件和系…

Kafka相关知识点(上)

为什么要使用消息队列? 使用消息队列的主要目的主要记住这几个关键词:解耦、异步、削峰填谷。 解耦: 在一个复杂的系统中,不同的模块或服务之间可能需要相互依赖,如果直接使用函数调用或者 API 调用的方式,会造成模块之间的耦合…

ureport配置方法

1、项目启动后登录这个网址,ip和端口自己系统的 http://localhost:8080/ureport/designer 点击这个地方,图标类似一个文件夹选择下图标注的两个文件,这两个文件就是eoa系统要用到的报表文件,还是点击类似文件夹图标的图标 正在上…

在Linux上搭建Minecraft服务器的方法

前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到网站。 在 Linux(Ubuntu 12.04)上设置 Minecraft 服务器是一个相当简单的任务,只需通过命令行即可完成。 在…

/检测是否json格式参数;对现在 上月时间参数;JS判断数组(数组对象)是否发现变化;js判断对象是否是空对象

//检测是否json格式参数 isValidJSON(str) { if (/^[\],:{}\s]*$/.test(str.replace(/\\["\\\/bfnrtu]/g, ). replace(/"[^"\\\n\r]*"|true|false|null|-?\d (?:\.\d*)?(?:[eE][ \-]?\d )?/g, ]). replace(/(?:^|:|,)(?:\s*\[) /g, ))) { return t…

Java学习路线:JUnit单元测试

目录 使用JUnit 导入依赖 使用Junit 添加前置/后置操作 当项目十分庞大时,如果想测试一个很小的功能,都要启动整个项目来测试,会很浪费时间。 那能否将某个小功能单独拆出来进行测试呢? 这就是单元测试的作用。而JUnit就是一…

RK3568平台(camera篇)车载摄像头串行器和解串器方案

一.串行器和解串器简介 SerDes是Serializer/Deserializer的缩写,即串行器和解串器。由于同轴线的传输延迟几乎可以忽略不计(ns级别),相当于将原来只能短距离传输的高速并行信号(MIPI/I2C/CLK等)的传输距离延长,真正做到高带宽、低延迟、长距离的数据传输。 串行器(Seri…

问:Redis为什么这么快?

Redis,全称Remote Dictionary Server,是一个开源的高性能键值对数据库。它以其卓越的性能、丰富的数据结构和灵活的使用方式,在现代互联网应用中扮演着重要角色。本文将探讨Redis之所以快的原因,包括其数据结构、内存管理、IO多路…

Qt 窗口部件的焦点策略

setFocusPolicy 函数是 Qt 框架中的一个成员函数,用于设置窗口部件(widget)的焦点策略(focus policy)。具体来说,Qt中的焦点策略(Qt::FocusPolicy)决定了控件是否可以通过键盘&#…

【A】【Maven项目热部署】将Maven项目热部署到远程tomcat服务器上

将Maven项目热部署到远程tomcat中 文章目录 将Maven项目热部署到远程tomcat中1.解决方案:2.实现3.Tomcat中的Root项目的配置和使用4.在tomcat-user.xml中配置远程服务器tomcat的账户信息5.修改 IP 访问权限6.登录ROOT项目,使用Manager App功能管理tomcat…

Scrum价值观

五大价值观 尊重,勇气,专注,承诺,开放 三大支柱(经验主义的三大支柱) 透明度,检查,适应 三大职责(不是三大角色) 产品负责人,开发人员&#xff0c…

过渡,2D,3D 转换

## 3D 转换 ### 1、旋转:rotateX、rotateY、rotateZ javascript transform: rotateX(360deg); //绕 X 轴旋转360度 transform: rotateY(360deg); //绕 Y 轴旋转360度 transform: rotateZ(360deg); //绕 Z 轴旋转360度 1. rotateY 举例: html…