Golang操作ES全系列(olivere & curl操作es)
🚀全部代码(欢迎👏🏻star):
https://github.com/ziyifast/ziyifast-code_instruction/tree/main/go-demo/go-es
1 olivere
创建client
package mainimport ("crypto/tls""fmt""github.com/olivere/elastic/v7""net""net/http""time"
)var (//host = "http://localhost:9200"host = "http://es.xx.ziyi.com"
)func main() {esClient, err := elastic.NewClient(elastic.SetURL(host),elastic.SetSniff(false),elastic.SetBasicAuth("", ""),elastic.SetHttpClient(&http.Client{Transport: &DecoratedTransport{tp: &http.Transport{Proxy: http.ProxyFromEnvironment,DialContext: (&net.Dialer{Timeout: 30 * time.Second,KeepAlive: 30 * time.Second,}).DialContext,ForceAttemptHTTP2: true,MaxIdleConns: 100,IdleConnTimeout: 90 * time.Second,TLSHandshakeTimeout: 10 * time.Second,ExpectContinueTimeout: 1 * time.Second,TLSClientConfig: &tls.Config{InsecureSkipVerify: true,},},}}),)if err != nil {panic(err)}fmt.Println(esClient)
}type DecoratedTransport struct {tp http.RoundTripper
}func (d *DecoratedTransport) RoundTrip(request *http.Request) (*http.Response, error) {request.Host = "es.xx.ziyi.com"return d.tp.RoundTrip(request)
}
检测索引是否存在
func isExistIndex(esClient *elastic.Client) {isExist, err := esClient.IndexExists("test-ziyi-1-100004-100136").Do(context.TODO())if err != nil {panic(err)}if isExist {println("index exists")} else {println("index not exists")}
}
创建索引
func createIndex(esClient *elastic.Client) {type m map[string]interface{}indexMapping := m{"settings": m{"number_of_shards": 5, //分片数"number_of_replicas": 1, //副本数},"mappings": m{"properties": m{ //索引属性值"book_name": m{ //索引属性名"type": "text", //filed类型//"analyzer": "ik_max_word", //使用ik分词器进行分词"index": true, //当前field可以被用于查询条件"store": false, //是否额外存储},"author": m{"type": "keyword", //作为关键字不分词},"word_count": m{"type": "long",},"on_sale_time": m{"type": "date","format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis",},"book_desc": m{"type": "text",//"analyzer": "ik_max_word",},},},}result, err := esClient.CreateIndex("test-ziyi-1-100004-100136").BodyJson(indexMapping).Do(context.Background())if err != nil {panic(err)}if result.Acknowledged {println("create index success")} else {println("create index failed")}
}
删除索引
# 除了程序我们也可以先通过curl查看索引是否存在
curl http://localhost:9200/_cat/indices?v | grep ziyi
func deleteIndex(esClient *elastic.Client) {response, err := esClient.DeleteIndex("test-ziyi-1-100004-100136").Do(context.Background())if err != nil {panic(err)}println(response.Acknowledged)
}
添加记录doc
# _doc/1 # 表明查询id为1的doc文档
# test-ziyi-1-100004-100136指定索引名
curl -X GET "http://localhost:9200/test-ziyi-1-100004-100136/_doc/1?pretty"
func addDoc(esClient *elastic.Client) {type m map[string]interface{}documentMappings := m{"book_name": "士兵突击","author": "兰晓龙","word_count": 100000,"on_sale_time": "2000-01-05","book_desc": "一个关于部队的...",}//如果不指定id,则es会自动生成一个id(杂乱无序不好维护),response为返回的文档id//response, err := esClient.Index().Index("test-ziyi-1-100004-100136").BodyJson(documentMappings).Do(context.Background())response, err := esClient.Index().Index("test-ziyi-1-100004-100136").Id("1").BodyJson(documentMappings).Do(context.Background()) //指定idif err != nil {panic(err)}println(response.Id)
}
更新doc记录
func updateDoc(esClient *elastic.Client) {type m map[string]interface{}docMappings := m{"book_name": "士兵突击","author": "袁朗","word_count": 100000,"on_sale_time": "2000-01-05","book_desc": "一个关于部队的...",}//覆盖式修改(response返回doc记录的id)response, err := esClient.Update().Index("test-ziyi-1-100004-100136").Id("1").Doc(docMappings).Do(context.Background())//指定字段修改//response, err := esClient.Update().Index("test-ziyi-1-100004-100136").Id("1").Doc(map[string]interface{}{// "book_name": "我的团长我的团",//}).Do(context.Background())if err != nil {panic(err)}println(response.Id)
}
删除doc记录
func deleteDoc(esClient *elastic.Client) {//response返回删除的doc Id,如果要删除的doc不存在,则直接返回err not foundresponse, err := esClient.Delete().Index("test-ziyi-1-100004-100136").Id("1").Do(context.Background())if err != nil {panic(err)}println(response.Id)
}
批处理
func sendBulkRequest(esClient *elastic.Client) {bulkRequest := esClient.Bulk()for i := 0; i < 10; i++ {docMappings := map[string]interface{}{"book_name": fmt.Sprintf("士兵突击-%d", i),"author": "袁朗","on_sale_time": "2000-01-05","book_desc": "一个关于部队的...",}bulkRequest = bulkRequest.Add(elastic.NewBulkIndexRequest().Index("test-ziyi-1-100004-100136").Doc(docMappings))}bulkResponse, err := bulkRequest.Do(context.Background())if err != nil {panic(err)}if bulkResponse.Errors {for _, item := range bulkResponse.Items {for _, action := range item {if action.Error != nil {fmt.Printf("Error for item: %s: %s", action.Error.Index, action.Error.Reason)}}}} else {fmt.Println("All bulk requests executed successfully")}
}
普通查询
func simpleSearch(esClient *elastic.Client) {response, err := esClient.Search([]string{"test-ziyi-1-100004-100136"}...).Query(elastic.NewTermQuery("author", "袁朗")).Size(100).Do(context.TODO())if err != nil {panic(err)}fmt.Println(response.Hits.Hits)
}
searchAfter翻页查询
func searchAfterSearch(esClient *elastic.Client) {var lastHit *elastic.SearchHitfor {q := elastic.NewBoolQuery().Must(elastic.NewTermQuery("book_name", "士"))searchSource := elastic.NewSearchSource().Query(q).Size(2).Sort("_id", false)if lastHit != nil {fmt.Printf("search After %+v\n", lastHit.Sort)searchSource.SearchAfter(lastHit.Sort...)}dsl, err := searchSource.MarshalJSON()if err != nil {panic(err)}fmt.Printf("dsl %s\n", string(dsl))searchResult, err := esClient.Search().Index("test-ziyi-1-100004-100136").SearchSource(searchSource).Do(context.Background())if err != nil {panic(err)}if len(searchResult.Hits.Hits) == 0 {fmt.Println("no more data")break}for _, hit := range searchResult.Hits.Hits {res := make(map[string]interface{})if err = json.Unmarshal(hit.Source, &res); err != nil {panic(err)}fmt.Printf("search %s %s\n", hit.Id, res["author"])}lastHit = searchResult.Hits.Hits[len(searchResult.Hits.Hits)-1]}
}
全部代码
package mainimport ("context""crypto/tls""encoding/json""fmt""github.com/olivere/elastic/v7""net""net/http""time"
)var (host = "http://test.ziyi.com"
)func main() {esClient := CreateEsClient()fmt.Println(esClient)//1. 操作索引//isExistIndex(esClient)//createIndex(esClient)//deleteIndex(esClient)//2. 操作doc文档(记录)//addDoc(esClient)//updateDoc(esClient)//deleteDoc(esClient)//3. 批处理请求//sendBulkRequest(esClient)//4. 查询//simpleSearch(esClient)//searchAfterSearch(esClient)}func searchAfterSearch(esClient *elastic.Client) {var lastHit *elastic.SearchHitfor {q := elastic.NewBoolQuery().Must(elastic.NewTermQuery("book_name", "士"))searchSource := elastic.NewSearchSource().Query(q).Size(2).Sort("_id", false)if lastHit != nil {fmt.Printf("search After %+v\n", lastHit.Sort)searchSource.SearchAfter(lastHit.Sort...)}dsl, err := searchSource.MarshalJSON()if err != nil {panic(err)}fmt.Printf("dsl %s\n", string(dsl))searchResult, err := esClient.Search().Index("test-ziyi-1-100004-100136").SearchSource(searchSource).Do(context.Background())if err != nil {panic(err)}if len(searchResult.Hits.Hits) == 0 {fmt.Println("no more data")break}for _, hit := range searchResult.Hits.Hits {res := make(map[string]interface{})if err = json.Unmarshal(hit.Source, &res); err != nil {panic(err)}fmt.Printf("search %s %s\n", hit.Id, res["author"])}lastHit = searchResult.Hits.Hits[len(searchResult.Hits.Hits)-1]}
}func simpleSearch(esClient *elastic.Client) {response, err := esClient.Search([]string{"test-ziyi-1-100004-100136"}...).Query(elastic.NewTermQuery("author", "袁朗")).Size(100).Do(context.TODO())if err != nil {panic(err)}fmt.Println(response.Hits.Hits)
}func sendBulkRequest(esClient *elastic.Client) {bulkRequest := esClient.Bulk()for i := 0; i < 10; i++ {docMappings := map[string]interface{}{"book_name": fmt.Sprintf("士兵突击-%d", i),"author": "aa","on_sale_time": "2000-01-05","book_desc": "一个关于部队的...",}bulkRequest = bulkRequest.Add(elastic.NewBulkIndexRequest().Index("test-ziyi-1-100004-100136").Doc(docMappings))}bulkResponse, err := bulkRequest.Do(context.Background())if err != nil {panic(err)}if bulkResponse.Errors {for _, item := range bulkResponse.Items {for _, action := range item {if action.Error != nil {fmt.Printf("Error for item: %s: %s", action.Error.Index, action.Error.Reason)}}}} else {fmt.Println("All bulk requests executed successfully")}
}func deleteDoc(esClient *elastic.Client) {//response返回删除的doc Id,如果要删除的doc不存在,则直接返回err not foundresponse, err := esClient.Delete().Index("test-ziyi-1-100004-100136").Id("1").Do(context.Background())if err != nil {panic(err)}println(response.Id)
}func updateDoc(esClient *elastic.Client) {type m map[string]interface{}docMappings := m{"book_name": "士兵突击","author": "袁朗","word_count": 100000,"on_sale_time": "2000-01-05","book_desc": "一个关于部队的...",}//覆盖式修改(response返回doc记录的id)response, err := esClient.Update().Index("test-ziyi-1-100004-100136").Id("1").Doc(docMappings).Do(context.Background())//指定字段修改//response, err := esClient.Update().Index("test-ziyi-1-100004-100136").Id("1").Doc(map[string]interface{}{// "book_name": "我的团长我的团",//}).Do(context.Background())if err != nil {panic(err)}println(response.Id)
}func addDoc(esClient *elastic.Client) {type m map[string]interface{}documentMappings := m{"book_name": "士兵突击","author": "兰晓龙","word_count": 100000,"on_sale_time": "2000-01-05","book_desc": "一个关于部队的...",}//如果不指定id,则es会自动生成一个id(杂乱无序不好维护),response为返回的文档id//response, err := esClient.Index().Index("test-ziyi-1-100004-100136").BodyJson(documentMappings).Do(context.Background())response, err := esClient.Index().Index("test-ziyi-1-100004-100136").Id("1").BodyJson(documentMappings).Do(context.Background()) //指定idif err != nil {panic(err)}println(response.Id)
}func deleteIndex(esClient *elastic.Client) {response, err := esClient.DeleteIndex("test-ziyi-1-100004-100136").Do(context.Background())if err != nil {panic(err)}println(response.Acknowledged)
}func createIndex(esClient *elastic.Client) {type m map[string]interface{}indexMapping := m{"settings": m{"number_of_shards": 5, //分片数"number_of_replicas": 1, //副本数},"mappings": m{"properties": m{ //索引属性值"book_name": m{ //索引属性名"type": "text", //filed类型//"analyzer": "ik_max_word", //使用ik分词器进行分词"index": true, //当前field可以被用于查询条件"store": false, //是否额外存储},"author": m{"type": "keyword", //作为关键字不分词},"word_count": m{"type": "long",},"on_sale_time": m{"type": "date","format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis",},"book_desc": m{"type": "text",//"analyzer": "ik_max_word",},},},}result, err := esClient.CreateIndex("test-ziyi-1-100004-100136").BodyJson(indexMapping).Do(context.Background())if err != nil {panic(err)}if result.Acknowledged {println("create index success")} else {println("create index failed")}
}func isExistIndex(esClient *elastic.Client) {isExist, err := esClient.IndexExists("test-ziyi-1-100004-100136").Do(context.TODO())if err != nil {panic(err)}if isExist {println("index exists")} else {println("index not exists")}
}func CreateEsClient() *elastic.Client {esClient, err := elastic.NewClient(elastic.SetURL(host),elastic.SetSniff(false),elastic.SetBasicAuth("", ""),elastic.SetHttpClient(&http.Client{Transport: &DecoratedTransport{tp: &http.Transport{Proxy: http.ProxyFromEnvironment,DialContext: (&net.Dialer{Timeout: 30 * time.Second,KeepAlive: 30 * time.Second,}).DialContext,ForceAttemptHTTP2: true,MaxIdleConns: 100,IdleConnTimeout: 90 * time.Second,TLSHandshakeTimeout: 10 * time.Second,ExpectContinueTimeout: 1 * time.Second,TLSClientConfig: &tls.Config{InsecureSkipVerify: true,},},}}),)if err != nil {panic(err)}return esClient
}type DecoratedTransport struct {tp http.RoundTripper
}func (d *DecoratedTransport) RoundTrip(request *http.Request) (*http.Response, error) {request.Host = "test.ziyi.com"return d.tp.RoundTrip(request)
}
2 go-elasticsearch
searchAfter翻页查询
package mainimport ("context""crypto/tls""encoding/json""fmt""github.com/cenkalti/backoff/v4""github.com/elastic/go-elasticsearch/v7""github.com/elastic/go-elasticsearch/v7/estransport""net""net/http""os""strings""time"
)var (url = []string{"http://test.ziyi.com"}username = ""password = ""sort = json.RawMessage(`[{"_id":{"order":"desc"}}]`)aggs = json.RawMessage(`{"size": {"sum": {"field": "size"}},"count":{"value_count": {"field": "_id"}}}`)size = 2indices = []string{"test-ziyi-1-100004-100136"}
)func main() {esClient, err := CreateClient(url, username, password)if err != nil {panic(err)}var searchAfter []interface{}for {dsl := Dsl{Sort: sort,Size: size,SearchAfter: searchAfter,Query: map[string]interface{}{"bool": map[string]interface{}{"must": map[string]interface{}{"wildcard": map[string]interface{}{"book_name": "士",},//"match_all": map[string]interface{}{},},},},}queryJson, err := json.MarshalIndent(dsl, "", "\t")if err != nil {panic(err)}fmt.Printf("queryJson:%s\n", queryJson)res, err := esClient.Search(esClient.Search.WithContext(context.Background()),esClient.Search.WithIndex(indices...),esClient.Search.WithBody(strings.NewReader(string(queryJson))),esClient.Search.WithTrackTotalHits(false),)if err != nil {panic(err)}var result struct {Hits struct {Hits []struct {Index string `json:"_index"`ID string `json:"_id"`Sort []interface{} `json:"sort"`Source map[string]interface{} `json:"_source"`} `json:"hits"`} `json:"hits"`}if err := json.NewDecoder(res.Body).Decode(&result); err != nil {panic(err)}err = res.Body.Close()if err != nil {panic(err)}if len(result.Hits.Hits) > 0 {lastHit := result.Hits.Hits[len(result.Hits.Hits)-1]searchAfter = lastHit.Sort} else {break}for _, h := range result.Hits.Hits {fmt.Printf("=====id:%s book_name:%s\n", h.ID, h.Source["book_name"])}}
}type Dsl struct {Sort json.RawMessage `json:"sort"`Size int `json:"size"`SearchAfter []interface{} `json:"search_after,omitempty"`Query map[string]interface{} `json:"query"`
}func CreateClient(url []string, username, password string) (*elasticsearch.Client, error) {es, err := elasticsearch.NewClient(genConfig(url, username, password))if err != nil {panic(err)return nil, err}res, err := es.Info()if err != nil {panic(err)return nil, err}defer res.Body.Close()return es, nil}type DecoratedTransport struct {tp http.RoundTripper
}func (d *DecoratedTransport) RoundTrip(request *http.Request) (*http.Response, error) {request.Host = "test.ziyi.com"return d.tp.RoundTrip(request)
}func genConfig(url []string, username, password string) elasticsearch.Config {retryBackoff := backoff.NewExponentialBackOff()cfg := elasticsearch.Config{Addresses: url,Logger: &estransport.ColorLogger{Output: os.Stdout},Username: username,Password: password,RetryOnStatus: []int{502, 503, 504, 429},RetryBackoff: func(i int) time.Duration {if i == 1 {retryBackoff.Reset()}return retryBackoff.NextBackOff()},MaxRetries: 5,Transport: &DecoratedTransport{tp: &http.Transport{Proxy: http.ProxyFromEnvironment,DialContext: (&net.Dialer{Timeout: 30 * time.Second,KeepAlive: 30 * time.Second,}).DialContext,ForceAttemptHTTP2: true,MaxIdleConns: 100,IdleConnTimeout: 90 * time.Second,TLSHandshakeTimeout: 10 * time.Second,ExpectContinueTimeout: 1 * time.Second,TLSClientConfig: &tls.Config{InsecureSkipVerify: true,},},},}return cfg
}
3 拓展
es基础概念
索引index(databse)
类型type(table),6.x后弃用
文档doc(row)
属性field(column)
curl操作es
curl localhost:9200/_cluster/health # 查看集群健康状态
curl localhost:9200/_cat/pending_tasks # 查看任务堆积详情
curl localhost:9200/_cluster/state/metadata # 查看集群元数据状态信息
curl localhost:9200/_cluster/stats # 查看集群指标统计信息
curl localhost:9200/_cluster/allocation/explain # 查看集群分片分配详情
curl localhost:9200/_cluster/allocation/explain #查看集群分片分配详情
curl http://localhost:9200/test-*/_count # 统计文档总数(记录数)
curl localhost:9200/_cluster/settings # 查看集群settings信息
curl localhost:9200/_tasks
curl http://localhost:9200/test-ziyi-1-100000-218/_mapping # 查看索引信息
curl -X GET "http://es.test.ziyi.com/test-ziyi-1-100004-100136/_doc/1?pretty" # 查询id为1的文档记录
curl http://localhost:9200/_cat/indices?v # 查看各个索引记录数,?v带上表头,展示详细信息
curl -X DELETE "http://192.168.100.88:9200/my_index" # 删除索引(包含索引结构)
# 删除索引数据不包含索引结构
curl -X POST \-H 'Content-Type: application/json' \-d '{"query":{"match_all":{}}}' \'http://localhost:9200/test-ziyi-metadata-1-100004-100136/_delete_by_query?pretty=true' # 进入es pod,执行查询命令
curl -X POST "http://your_elasticsearch_host:9200/test-ziyi-metadata-1-10000-3/_search" -H 'Content-Type: application/json' -d '
{"size": 100,"sort": [{ "mtime": { "order": "desc" } },{ "key": { "order": "desc" } }],"query": {"range": {"size": {"gt": 10485760}}}
}'
例如:查看索引信息