Elasticsearch写入、读取、更新、删除以及批量操作(golang)

目录

1、背景

2、elasticsearch基础操作

2.1 创建es链接客户端

2.2 创建、删除索引

2.3 插入文档

2.3 查询文档

2.4 删除一条文档

2.5 更新文档

2.6 逻辑查询

2.7 滚动查询(批量查询)

2.8 批量插入数据

2.9 批量更新文档

2.10 批量删除文档

3、检索条件设置

4、测试

5、总结 


1、背景

     自从trans到自动驾驶事业部数仓团队后,每天的工作基本都是围绕数据展开,重点是在检索方向上,因此熟练掌握elasticsearch的基本知识成为当务之急,es基础也是提升工作效率的必备技能。

      在项目中,我们使用了官方提供的包:github.com/elastic/go-elasticsearch,但是在使用的过程中,发现这个包比较偏底层,开发成本略高,且不容易理解,因此打算调研一个使用方便、开发成本低的三方包。

      目前网络上使用较多的三方包为:

https://github.com/olivere/elastic

目前最新版本:7.0.32,其发布都是在2年前(2022.3.19),好久没更新了,但是使用热度依然不减。下面就使用这个包来完成es文档的基本操作,包括增加、删除、更新、查询以及对应的批量操作。

2、elasticsearch基础操作

2.1 创建es链接客户端

首先定义一个es实例结构:

type EsInstance struct {Client *elastic.Client    // es客户端Index  map[string]Indexes // 所有索引
}type Indexes struct {Name    string `json:"name"`    // 索引名称Mapping string `json:"mapping"` // 索引结构
}

创建实例:

package elasticimport ("github.com/olivere/elastic/v7"
)type EsInstance struct {Client *elastic.Client    // es客户端Index  map[string]Indexes // 所有索引
}type Indexes struct {Name    string `json:"name"`    // 索引名称Mapping string `json:"mapping"` // 索引结构
}func NewEsInstance() (*EsInstance, error) {client, err := elastic.NewClient(elastic.SetURL("http://127.0.0.1:9200"), // 支持多个服务地址,逗号分隔elastic.SetBasicAuth("user_name", "user_password"),elastic.SetSniff(false), // 跳过ip检查,默认是true)if err != nil {return &EsInstance{}, err}return &EsInstance{Client: client,Index:  map[string]Indexes{},}, nil
}

2.2 创建、删除索引

对于索引,如果我们只是要做普通的标签检索,索引的mapping结构不需要单独设置,如果要支持向量检索,索引的mapping是需要手动创建的,创建索引:

func (es *EsInstance) CreateIndex(index string, mapping string) error {// 判断索引是否存在exists, err := es.Client.IndexExists(index).Do(context.Background())if err != nil {return err}// 索引存在,直接返回if exists {return nil}// 创建索引createIndex, err := es.Client.CreateIndex(index).BodyString(mapping).Do(context.Background()) // 指定索引名称和结构if err != nil {return err}if !createIndex.Acknowledged {return errors.New("create index failed")}es.Index[index] = Indexes{Name: index, Mapping: mapping}return nil
}

删除索引:

func (es *EsInstance) DeleteIndex(index string) error {// 判断索引是否存在exists, err := es.Client.IndexExists(index).Do(context.Background())if err != nil {return err}// 索引不存在,直接返回if !exists {return nil}// 删除索引deleteIndex, err := es.Client.DeleteIndex(index).Do(context.Background())if err != nil {return err}if !deleteIndex.Acknowledged {return errors.New("delete index failed")}return nil
}

2.3 插入文档

定义文档结构:

type Record struct {Index  string `json:"_index"` // 索引名称Type   string `json:"_type"`  // 索引类型Id     string `json:"_id"`    // 索引IDSource Source `json:"source"` // 文档内容
}
type Source struct {Id   string `json:"id"`   // 学号Name string `json:"name"` // 姓名Age  int    `json:"age"`  // 年龄Sex  string `json:"sex"`  // 性别
}

插入文档,文档id自定义,如果不传入id,es将会自动生成一个id:


func (es *EsInstance) InsertOneRecord(indexName string, record Record) error {_, err := es.Client.Index().Index(indexName).Id(record.Source.Id).BodyJson(record).Do(context.Background())if err != nil {return err}return nil
}

2.3 查询文档

根据文档id获取一条文档。

func (es *EsInstance) GetOneRecord(indexName, id string) (*Record, error) {record := &Record{}fmt.Println("index name: ", indexName, " id: ", id)result, err := es.Client.Get().Index(indexName).Id(id).Do(context.Background())if err != nil {fmt.Printf("Error getting a document: %s\n", err.Error())return record, err} else {fmt.Println("Document got!")}if result.Found {err := json.Unmarshal(result.Source, &record.Source)if err != nil {fmt.Printf("Error unmarshalling the source: %s\n", err.Error())}}record.Id = result.Idrecord.Index = result.Indexrecord.Type = result.Typereturn record, nil
}

2.4 删除一条文档

根据文档id删除一条文档。

func (es *EsInstance) DeleteOneRecord(indexName, id string) error {_, err := es.Client.Delete().Index(indexName).Id(id).Do(context.Background())if err != nil {fmt.Printf("Error deleting a document: %s\n", err.Error())}return nil
}

2.5 更新文档

根据文档id更新文档:

func (es *EsInstance) UpdateOneRecord(indexName, id string, record Source) error {_, err := es.Client.Update().Index(indexName).Id(id).Doc(record).Do(context.Background())if err != nil {fmt.Printf("Error updating a document: %s\n", err.Error())}return nil
}

2.6 逻辑查询

根据自定义的dsl进行数据检索,该函数不支持滚动,只能进行单次检索,每次最多1W条:

// 只返回设定的条数,最多10000
func (es *EsInstance) Search(indexName string, size int, query elastic.Query) ([]Record, error) {var json = jsoniter.ConfigCompatibleWithStandardLibraryrecords := make([]Record, 0)if size > 10000 {size = 10000} else if size < 1 {size = 1}searchResult, err := es.Client.Search().Index(indexName).Query(query).Size(size).Do(context.Background())if err != nil {fmt.Printf("Error searching: %s\n", err.Error())return records, err} else {fmt.Println("Search successful!")}if searchResult.Hits.TotalHits.Value > 0 {for _, hit := range searchResult.Hits.Hits {record := &Record{}err := json.Unmarshal(hit.Source, &record.Source)if err != nil {fmt.Printf("Error unmarshalling the source: %s\n", err.Error())} else {record.Id = hit.Idrecords = append(records, *record)}}}return records, err
}

2.7 滚动查询(批量查询)

滚动查询的意思是,分批多次查询,直到检索到全部数据:

// 滚动搜索,每次最多1000条
func (es *EsInstance) SearchScroll(indexName string, size int, query elastic.Query) ([]Record, error) {var json = jsoniter.ConfigCompatibleWithStandardLibraryrecords := make([]Record, 0)if size > 1000 {size = 1000} else if size < 1 {size = 1}searchResult, err := es.Client.Scroll(indexName).Query(query).Size(size).Do(context.Background())if err != nil {fmt.Printf("Error searching: %s\n", err.Error())return records, err}if searchResult.Hits.TotalHits.Value > 0 {for _, hit := range searchResult.Hits.Hits {record := &Record{}err := json.Unmarshal(hit.Source, &record.Source)if err != nil {fmt.Printf("Error unmarshalling the source: %s\n", err.Error())} else {record.Id = hit.Idrecord.Index = hit.Indexrecord.Type = hit.Typerecords = append(records, *record)}}}scrollId := searchResult.ScrollIdfmt.Printf("\n..........begin scroll...........\n")for {scrollResult, err := es.Client.Scroll().ScrollId(scrollId).Do(context.Background())if err != nil {fmt.Printf("Error searching: %s\n", err.Error())return records, err}// 检查是否还有结果if scrollResult.Hits.TotalHits.Value == 0 {break}// 更新滚动IDscrollId = scrollResult.ScrollId// 处理结果for _, hit := range scrollResult.Hits.Hits {record := &Record{}err := json.Unmarshal(hit.Source, &record.Source)if err != nil {fmt.Printf("Error unmarshalling the source: %s\n", err.Error())} else {record.Id = hit.Idrecord.Index = hit.Indexrecord.Type = hit.Typerecords = append(records, *record)}}}es.Client.ClearScroll(scrollId).Do(context.Background())return records, err
}

这个函数,只是演示了滚动查询的操作步骤,在实际使用中,不能这样操作,很有可能会出现因为数据量过大导致的内存不够问题,理想的操作步骤应该是滚动一次处理一次数据,而不是把所有数据全部检索出来单次返回。

2.8 批量插入数据

为了提升操作es的性能,批量操作肯定是首选,批量操作主要使用Bulk的api,如下:

func (es *EsInstance) BatchInserRecords(indexName string, records []Record) error {req := es.Client.Bulk().Index(indexName)for _, record := range records {u := &record.Sourcedoc := elastic.NewBulkIndexRequest().Id(record.Id).Doc(u)req.Add(doc)}if req.NumberOfActions() < 0 {fmt.Printf("no actions to bulk insert\n")return nil}if _, err := req.Do(context.Background()); err != nil {fmt.Println("bulk insert error:" + err.Error())return err}return nil
}

2.9 批量更新文档

批量更新和批量插入大同小异,值更换一个api即可:

func (es *EsInstance) BatchUpdateRecords(indexName string, records []Record) error {req := es.Client.Bulk().Index(indexName)for _, record := range records {u := &record.Sourcedoc := elastic.NewBulkUpdateRequest().Id(record.Id).Doc(u)req.Add(doc)}if req.NumberOfActions() < 0 {fmt.Printf("no actions to bulk insert\n")return nil}if _, err := req.Do(context.Background()); err != nil {fmt.Println("bulk insert error:" + err.Error())return err}return nil
}

2.10 批量删除文档

func (es *EsInstance) BatchDeleteRecords(indexName string, records []Record) error {req := es.Client.Bulk().Index(indexName)for _, record := range records {doc := elastic.NewBulkDeleteRequest().Id(record.Id)req.Add(doc)}if req.NumberOfActions() < 0 {fmt.Printf("no actions to bulk insert\n")return nil}if _, err := req.Do(context.Background()); err != nil {fmt.Println("bulk insert error:" + err.Error())return err}return nil
}

3、检索条件设置

一般情况下,bool查询使用的比较多,就以bool查询为例说明,构造bool查询:

boolQuery := elastic.NewBoolQuery()

增加一个match查询条件:

boolQuery.Filter(elastic.NewMatchQuery("tag_name", "turn_left"))

增加一个range查询:

boolQuery.Filter(elastic.NewRangeQuery("start_time").Gte(1723737600000))

其他各类的检索条件我们可以根据自身需要进行设置,如果对dsl有一定的了解,这些api使用起来非常简单。

4、测试

假设要检索某辆车、某个时间段内路口左转的数据,构造dsl代码如下:

boolQuery.Filter(elastic.NewMatchQuery("tag_name", "turn_left"))
boolQuery.Filter(elastic.NewMatchQuery("car_id", "京A0001"))
boolQuery.Filter(elastic.NewRangeQuery("start_time").Gte(1723737600000))
boolQuery.Filter(elastic.NewRangeQuery("end_time").Lte(1723823999000))

之后调用2.7章节的滚动查询函数,则可以检索出所有数据:

SearchScroll("index_2024", 4000, boolQuery)

5、总结 

以上总共介绍了10个api,基本上就满足了日常对es操作的需求,测试、验证适用于使用单条检索,这样方便验证数据的准确性,批量操作用于验证数据、逻辑没有问题后的操作,支持批量主要是为了提升效率,以快速完成对数据的操作。 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/52874.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

探索Unity与C#的无限潜能:从新手到高手的编程之旅

在数字创意与技术创新交织的今天&#xff0c;Unity游戏引擎凭借其强大的跨平台能力和灵活的编程接口&#xff0c;成为了无数开发者心中的首选。而C#&#xff0c;作为Unity的官方脚本语言&#xff0c;更是以其面向对象的特性和丰富的库支持&#xff0c;为游戏开发注入了无限可能…

Golang | Leetcode Golang题解之第375题猜数字大小II

题目&#xff1a; 题解&#xff1a; func getMoneyAmount(n int) int {f : make([][]int, n1)for i : range f {f[i] make([]int, n1)}for i : n - 1; i > 1; i-- {for j : i 1; j < n; j {f[i][j] j f[i][j-1]for k : i; k < j; k {cost : k max(f[i][k-1], f[…

关于Scrapy的那些事儿(四)Scrapy Shell

Scrapy Shell launch Scrapy shell 使用如下命令&#xff1a; scrapy shell <url>当运行scrapy shell的时候&#xff0c;它为我们提供了一些功能函数&#xff1a; shelp() :打印可用对象和快捷命令的帮助列表fetch&#xff08;request or url&#xff09;&#xff1a;…

CMake构建学习笔记11-minizip库的构建

准确来说&#xff0c;minizip其实是zlib提供的辅助工具&#xff0c;位于zlib库的contrib文件夹内。minizip提供了更为高级一点的接口&#xff0c;能直接操作文件进行压缩。不过&#xff0c;有点麻烦的是这个工具并没有提供CMake构建的方式。那么可以按照构建giflib的方式&#…

Java开发工程师-匹配性岗位(借鉴性质)

1.匹配性质 技能迁移:Java开发工程师通常具备较强的编程能力、逻辑思维和问题解决能力,这些技能可以迁移到其他领域。行业选择:考虑目前行业趋势以及未来发展方向,Java工程师可以转向大数据、人工智能、云计算等等领域。个人兴趣与职业发展:转行时个人的兴趣和职业发展规划…

1.1量化交易的定义与魅力

Hey&#xff0c;Python高手们&#xff0c;欢迎来到量化交易的世界&#xff01;在这里&#xff0c;我们不谈风花雪月&#xff0c;只谈数字和代码。量化交易&#xff0c;听起来是不是有点像是“用电脑代替人脑做交易”的黑科技&#xff1f;没错&#xff0c;你猜对了&#xff01; …

【JAVA入门】Day28 - 数据结构

【JAVA入门】Day28 - 数据结构 文章目录 【JAVA入门】Day28 - 数据结构一、栈二、队列三、数组3.1 ArrayList 四、链表4.1 LinkedList 五、二叉树5.1 二叉查找树5.2 二叉树的遍历方式5.3 平衡二叉树5.4 平衡二叉树的旋转5.5 平衡二叉树需要旋转的几种情况 六、红黑树6.1 红黑规…

永成防回水防回气装置煤矿毫不犹豫选择

永成防回水防回气装置煤矿毫不犹豫选择&#xff0c;不敢说我们有多好&#xff0c;我们只把简单的事做好&#xff0c;用心服务&#xff0c;因为品质&#xff0c;所以信任。因为信任&#xff0c;所以值得选择。 本防回水防回气装置是一种用于煤矿瓦斯管路爆渣和燃烧时防止回火、…

abstract类

1 问题 abstract类不能用 new运算符创建对象&#xff0c;必须产生其子类&#xff0c;由子类创建对象。若 abstract类的类体中有 abstract()方法&#xff0c;则只允许声明&#xff0c;不能带有方法体&#xff0c;而该类的子类必须实现 abstract()方法。一个abstract类只关心子类…

3_1_PID控制原理

自从计算机进入控制领域以来&#xff0c;用数字计算机代替模拟计算机调节器组成计算机控制系统&#xff0c;不仅可以用软件实现PID控制算法&#xff0c;而且可以利用计算机的逻辑功能&#xff0c;使PID控制更加灵活。数字PID控制在生产过程中是一种最普遍采用的控制方法&#x…

[Algorithm][综合训练][奇数位丢弃][求和][计算字符串的编辑距离]详细讲解

目录 1.奇数位丢弃1.题目链接2.算法原理详解 && 代码实现 2.求和1.题目链接2.算法原理详解 && 代码实现 3.计算字符串的编辑距离1.题目链接2.算法原理详解 && 代码实现 1.奇数位丢弃 1.题目链接 奇数位丢弃 2.算法原理详解 && 代码实现 解法…

YOLOv9改进策略【损失函数篇】| 利用MPDIoU,加强边界框回归的准确性

一、背景 目标检测和实例分割中的关键问题&#xff1a; 现有的大多数边界框回归损失函数在不同的预测结果下可能具有相同的值&#xff0c;这降低了边界框回归的收敛速度和准确性。 现有损失函数的不足&#xff1a; 现有的基于 ℓ n \ell_n ℓn​范数的损失函数简单但对各种尺度…

计算机网络速成(三)

一、网络协议与模型 什么是协议&#xff1f; 协议是指计算机系统中完成特定任务所必需的规则和约定&#xff0c;特别是数据传输和交换的规则和约定。OSI和TCP/IP是什么&#xff1f; OSI&#xff08;开放式系统互连参考模型&#xff09;是一种网络架构模型&#xff0c;将网络系…

linux配置jenkins环境

目录 一、安装javal环境 二、安装ansible 三、安装git 四、配置服务器免密登陆 五、安装jenkins 一、安装java环境 本次安装jdk的版本为11&#xff0c;jdk的版本需要和jenkins版本相匹配 1、下载jdk&#xff0c;可以去oracle官网注册账号进行下载&#xff0c;不想折腾的可…

Redis与SpringMVC的整合与最佳实践

整合Redis与Spring MVC&#xff08;现在通常是Spring Boot的一部分&#xff09;可以提高应用性能&#xff0c;特别是在处理大量数据缓存和会话状态管理方面。 下面是一些关于如何整合Redis与Spring MVC的最佳实践&#xff1a; 1. 引入依赖 首先&#xff0c;你需要在你的项目中…

【Java】Maven多环境切换实战(实操图解)

Java系列文章目录 补充内容 Windows通过SSH连接Linux 第一章 Linux基本命令的学习与Linux历史 文章目录 Java系列文章目录一、前言二、学习内容&#xff1a;三、问题描述四、解决方案&#xff1a;4.1 Maven多环境配置学习4.2 切换环境4.2.1 先打包4.2.2 之后可以切换 五、总结…

【ACM独立出版 | 厦大主办】第五届计算机科学与管理科技国际学术会议(ICCSMT 2024,10月18-20)

第五届计算机科学与管理科技国际学术会议(ICCSMT 2024) 定于2024年10月18-20日在中国厦门举行。 会议旨在为从事“计算机科学”与“管理科技”研究的专家学者、工程技术人员、技术研发人员提供一个共享科研成果和前沿技术&#xff0c;了解学术发展趋势&#xff0c;拓宽研究思路…

设计模式结构型模式之适配器模式

结构型模式之适配器模式 一、概述和使用场景1、概述2、使用场景&#xff1a;3、主要分类 二、 代码示例1、类适配器模式2、接口适配器3、对象适配器 四、总结1、适配器模式2、适配器模式的优点3、适配器模式的缺点 一、概述和使用场景 1、概述 适配器模式是一种结构型设计模式…

视频修复学习笔记

目录 PGTFormer 人脸修复 PGTFormer 人脸修复 PGTFormer&#xff08;Parsing-Guided Temporal-Coherent Transformer&#xff09; &#xff0c;这是第一个专门为视频人脸恢复设计的方法。PGTFormer采用了端到端的设计&#xff0c;摒弃了传统方法中的复杂对齐步骤&#xff0c…

React项目-less、antd配置

一、创建Reract项目 1、创建React项目 npx create-react-app react-test 2、运行eject Cesium静态资源需要webpack配置&#xff0c;执行npm run eject可以生成webpack配置&#xff0c;运行前先查看当前git版本是否有提交&#xff0c;如果未提交&#xff0c;需要先本地提交gi…