elasticsearch,golang客户端聚合查询
目前在做的监控项目中有个对es的聚合查询的需求,需要用go语言实现,
需求就是查询某个IP在一个时间范围内,各个监控指标取时间单位内的平均值。
有点拗口,如下是es的查询语句,可以很明显的看到是要聚合cpu和mem两个field。
另外,时区必须要加上,否则少8小时,你懂的。
GET /monitor/v1/_search { "query": { "bool": { "must": [ { "term": { "ip": { "value": "192.168.1.100" } } }, { "range": { "datatime": { "gte": 1545634340000, "lte": "now" } } } ] } }, "size": 0, "aggs": { "AVG_Metric": { "date_histogram": { "field": "datatime", "interval": "day", "time_zone": "Asia/Shanghai" , "format": "yyyy-MM-dd HH:mm:ss" }, "aggs": { "avg_mem": { "avg": { "field": "mem" } },"avg_cpu":{ "avg": { "field": "cpu" } } } } } }
单条数据的内容大概是这样:
datatime是毫秒级的时间戳,索引的mapping一定要是date类型,否则不能做聚合。
{ "cpu":5, "mem":77088, "datatime":1545702661000, "ip":"192.168.1.100" }
查询出来的结果呢,长这个样子:
可以看到,在buckets中查询出了3条数据,其中几个字段的意思:
key_as_string与key:format后的时间和毫秒时间戳
doc_count:聚合了多少的doc
avg_mem与avg_cpu:查询语句中定义的别名和value。
{ "took": 4, "timed_out": false, "_shards": { "total": 5, "successful": 5, "skipped": 0, "failed": 0 }, "hits": { "total": 2477, "max_score": 0, "hits": [] }, "aggregations": { "AVG_Metric": { "buckets": [ { "key_as_string": "2018-12-24 00:00:00", "key": 1545580800000, "doc_count": 402, "avg_mem": { "value": 71208.1592039801 }, "avg_cpu": { "value": 4.338308457711443 } }, { "key_as_string": "2018-12-25 00:00:00", "key": 1545667200000, "doc_count": 1258, "avg_mem": { "value": 77958.16852146263 }, "avg_cpu": { "value": 4.639904610492846 } }, { "key_as_string": "2018-12-26 00:00:00", "key": 1545753600000, "doc_count": 817, "avg_mem": { "value": 86570.06609547124 }, "avg_cpu": { "value": 4.975520195838433 } } ] } } }
下面使用go语言的es客户端“github.com/olivere/elastic”来实现相同的查询需求,我用的是v6版本:
关于聚合结果的输出,遇到了些问题,网上也搜索不到,官方也没找到相应的例子。不过总算试出两个可行的,比较推荐第一种,可以少定义两个结构体。上代码:
package main import ( "github.com/olivere/elastic" "context" "encoding/json" "fmt" ) type Aggregations struct { AVG_Metric AVG_Metric `json:"AVG_Metric"` } type AVG_Metric struct { Buckets []Metric `json:"buckets"` } type Metric struct { Key int64 `json:"key"` Doc_count int64 `json:"doc_count"` Avg_mem Value `json:"avg_mem"` Avg_cpu Value `json:"avg_cpu"` } type Value struct { Value float64 `json:"value"` } var client, _ = elastic.NewSimpleClient(elastic.SetURL("http://127.0.0.1:9200")) func main() { //指定ip和时间范围 boolSearch := elastic.NewBoolQuery(). Filter(elastic.NewTermsQuery("ip", "192.168.1.100")). Filter(elastic.NewRangeQuery("datatime").Gte(1545634340000).Lte("now")) //需要聚合的指标 mem := elastic.NewAvgAggregation().Field("mem") cpu := elastic.NewAvgAggregation().Field("cpu") //单位时间和指定字段 aggs := elastic.NewDateHistogramAggregation(). Interval("day"). Field("datatime"). TimeZone("Asia/Shanghai"). SubAggregation("avg_mem", mem). SubAggregation("avg_cpu", cpu) //查询语句 result, _ := client.Search(). Index("monitor"). Type("v1"). Query(boolSearch). Size(0). Aggregation("AVG_Metric", aggs). Do(context.Background()) //结果输出: //第一种方式 var m *[]Metric term, _ := result.Aggregations.Terms("AVG_Metric") for _, bucket := range term.Aggregations { b, _ := bucket.MarshalJSON() //fmt.Println(string(b)) json.Unmarshal(b, &m) for _, v := range *m { fmt.Println(v) } } //第二种方式 var a *Aggregations b, _ := json.Marshal(result.Aggregations) //fmt.Println(string(b)) json.Unmarshal(b, &a) for _, v := range a.AVG_Metric.Buckets { fmt.Println(v) } }
{1545580800000 402 {71208.1592039801} {4.338308457711443}} {1545667200000 1258 {77958.16852146263} {4.639904610492846}} {1545753600000 965 {87164.17409326424} {4.972020725388601}} {1545580800000 402 {71208.1592039801} {4.338308457711443}} {1545667200000 1258 {77958.16852146263} {4.639904610492846}} {1545753600000 965 {87164.17409326424} {4.972020725388601}}
相关推荐
newbornzhao 2020-09-14
做对一件事很重要 2020-09-07
renjinlong 2020-09-03
明瞳 2020-08-19
李玉志 2020-08-19
mengyue 2020-08-07
molong0 2020-08-06
AFei00 2020-08-03
molong0 2020-08-03
wenwentana 2020-08-03
YYDU 2020-08-03
另外一部分,则需要先做聚类、分类处理,将聚合出的分类结果存入ES集群的聚类索引中。数据处理层的聚合结果存入ES中的指定索引,同时将每个聚合主题相关的数据存入每个document下面的某个field下。
sifeimeng 2020-08-03
心丨悦 2020-08-03
liangwenrong 2020-07-31
sifeimeng 2020-08-01
mengyue 2020-07-30
tigercn 2020-07-29
IceStreamLab 2020-07-29