ElasticsearchのPipeline aggregationsは、ElasticsearchでSQLのHavingっぽいことが可能になるしかけです*1。
これまた品揃えが豊富で、今確認したら、20種類近くあるようですが、次のBucket aggregationsや Metrics aggregationsの記事と同じ論法(シンタックスが似ているAggregationは、似た挙動となるのでまとめて覚えると一度に覚える負担が減る論法!)でPipelineについてもおなじように本記事でまとめてみました。
- 事前準備:
- 1.各バケットでの集計値を見るAggs
- 2. max_bucketなどと同様にbuckets_pathを指定するが累積型の挙動なので親バケットに条件ありAggs
- 3. ソートする(sort)
- 4. bucket_scriptとbucket_selector
- その他のPipeline aggregations
※この記事を書くにあたり、Elasticsearchのバージョンは6.8で確認しました。
事前準備:
この類の話はサンプルデータがあると理解が進みます。
次の過去記事の最初のセクション記載の手順でa_exインデックスを作成ください。
1.各バケットでの集計値を見るAggs
Pipleline aggsのうち、入力に数値を取り扱い、必須プロパティがbuckets_pathでかつ大外のaggsにぶら下げるタイプのもの一覧です。
他に紛れているとわかりづらいですが、こうして抜き出してみると、名前だけで何をしてくれるかなんとなく分かりますね(?)。
このシリーズの例によって(?)、シンタックスのエッセンスをあぶり出す意図で、JSONを扱いやすいJavaScriptの簡易コードを通して、シンタックスを説明します。
a_exインデックスに対して、上記を全部盛りしたクエリを標準出力に出力する、JavaScriptの例は次のとおりです。
// Pipeline aggregationsは前段階の別のAggregationsにのっかることになりますが、その土台のクエリです。 const bucp = 'grp>the_sum'; const base_query = { "size": 0, "aggs": { "grp": { "terms": { "field": "a.keyword" }, "aggs": { "the_sum": { "sum": { "field": "b" } } } } } }; // Pipleline aggsのうち、入力に数値を取り扱い、必須プロパティがbuckets_pathでかつ大外のaggsにぶら下げるタイプのもの一覧 const plagg_names = [ "max_bucket", "min_bucket", "sum_bucket", "stats_bucket", "extended_stats_bucket", ]; const bp1 = (plagg_name, buckets_path) => { // buckets_pathは複数階層の場合、「aaa > bbb」のようなパンくず風に指定する。 return { [plagg_name]: { buckets_path } } }; // (もちろん1つずつ使えますが)比較のため全てのaggsを一度に実行するクエリを生成します。 plagg_names.forEach(pln => { base_query.aggs[`my_${pln}`] = bp1(pln, bucp); }); console.log(JSON.stringify(base_query));
上記を実行して得られるElasticsearchの生クエリは次のものです。
GET a_ex/_search { "size": 0, "aggs": { "grp": { "terms": { "field": "a.keyword" }, "aggs": { "the_sum": { "sum": { "field": "b" } } } }, "my_max_bucket": { "max_bucket": { "buckets_path": "grp>the_sum" } }, "my_min_bucket": { "min_bucket": { "buckets_path": "grp>the_sum" } }, "my_sum_bucket": { "sum_bucket": { "buckets_path": "grp>the_sum" } }, "my_stats_bucket": { "stats_bucket": { "buckets_path": "grp>the_sum" } }, "my_extended_stats_bucket": { "extended_stats_bucket": { "buckets_path": "grp>the_sum" } } } }
これは、先のa_exに対して、フィールドaの登録値で、Bucket Agg(値でグループ化)して、グループ内のbフィールド値の合計をMetrics Agg(のsum)で計算したバケットが得られているところに対し、「max_bucket」でグループごとの合計が最も多いグループを抜き出す、最も少ない(min_bucket)グループを抜き出す、... というクエリ例になっています。
なお、buckets_pathに、grp>the_sumと目印が指定されていますが、これはクエリの上の方の、grpとその配下のthe_sumに至るパスを示すことで、Aggregation結果を参考にmax_bucket等を行いなさいという命令になっているととらえて良いでしょう。
ということでこれの実行結果です。↓
{ "aggregations" : { "grp" : { "buckets" : [ { "key" : "a10", "doc_count" : 5, "the_sum" : { "value" : 50.0 } }, { "key" : "a100", "doc_count" : 5, "the_sum" : { "value" : 500.0 } }, { "key" : "a1000", "doc_count" : 5, "the_sum" : { "value" : 5000.0 } }, { "key" : "a10000", "doc_count" : 5, "the_sum" : { "value" : 50000.0 } }, { "key" : "a100000", "doc_count" : 5, "the_sum" : { "value" : 500000.0 } } ] }, "my_max_bucket" : { "value" : 500000.0, "keys" : [ "a100000" ] }, "my_min_bucket" : { "value" : 50.0, "keys" : [ "a10" ] }, "my_sum_bucket" : { "value" : 555550.0 }, "my_stats_bucket" : { "count" : 5, "min" : 50.0, "max" : 500000.0, "avg" : 111110.0, "sum" : 555550.0 }, "my_extended_stats_bucket" : { "count" : 5, "min" : 50.0, "max" : 500000.0, "avg" : 111110.0, "sum" : 555550.0, "sum_of_squares" : 2.525252525E11, "variance" : 3.81596184E10, "std_deviation" : 195344.87042151886, "std_deviation_bounds" : { "upper" : 501799.7408430377, "lower" : -279579.7408430377 } } } }
2. max_bucketなどと同様にbuckets_pathを指定するが累積型の挙動なので親バケットに条件ありAggs
以下のとおり、前項のmax_bucketと単品のクエリシンタックスは似ていますが、cumulattiveとかderivativeという名のとおり、親バケット(上位のグループ化)が産まれながらに順序性を持つことになるhistogram系Aggバケットが前提になっており、言い換えると親バケットにぶら下げる形でクエリを作るという点でも前項とやや異なります。
const bp2 = { "cumulative_sum": { "buckets_path": "aggname" } } const bp3 = { "derivative": { "buckets_path": "aggname" } } // X-Pack const bp4 = { "cumulative_cardinality": { "buckets_path": "aggname" } }
実際のクエリ例
GET a_ex/_search { "size": 0, "aggs": { "adh": { "date_histogram": { "field": "date", "interval": "1D" }, "aggs": { "the_sum": { "sum": { "field": "b" } }, "cumsum": { "cumulative_sum": { "buckets_path": "the_sum" } }, "derv": { "derivative": { "buckets_path": "the_sum" } } } } } }
derivativeの例はエラーにはならなかったというレベルの例になっていますが、cumsumは累積和になっていることがよくわかる応答が戻ってきます。
3. ソートする(sort)
集計というよりは、バケット一覧をソートします。
ぶら下げたAggのバケット一覧を特定の条件で並び替えます。
* シンタックス例(の擬似表現)
const sort = { "bucket_sort": { "sort": [ { "aggname1": { "order": "asc" } }, { "aggname2": { "order": "desc" } }, ], "from": 1, "size": 3 } }
* 実際のクエリ例
GET a_ex/_search { "size": 0, "aggs": { "grp": { "terms": { "field": "a.keyword" }, "aggs": { "the_sum": { "sum": { "field": "b" } }, "the_avg": { "avg": { "field": "b" } }, "my_max_bucket": { "bucket_sort": { "sort": [ { "the_avg": { "order": "desc" } }, { "the_sum": { "order": "desc" } } ], "from": 1, "size": 3 } } } } } }
この例だと旨味を感じにくいですが、フィールドaの値でグループ分けし、バケットのbフィールドのグループ内の合計の降順、バケットのbフィールドのグループ平均の降順の優先度で得られたMetrix Aggregationバケットの一覧をソートしています。
4. bucket_scriptとbucket_selector
Painless scriptで同じ階層のAggregationで得られた値を変数に見立てて、新たな演算を行ったり(bucket_script)、条件にマッチするもののみフィルター(bucket_selector)します。
ビルトインのままでは難しいものはこちらで頑張るという選択肢を広げてくれますね。
擬似シンタックス表現はこちら↓。
// シンタックスはよく似ています。 // 同じ並びのaggsの値を使って、さらに別の計算値を取得します。script部分で計算式を指定します。 // ここでは、バケットごとのあるフィールドの合計値をバケット内の要素数で割って、平均値を取得する例です。 // (この例であれば、わざわざPipeline aggsを使わなくとも、Metrics aggsのavgで十分ですが、あえてそちらと同じ結果をPipeline aggsで得るという例としました) const script = { "bucket_script": { "buckets_path": { "v1": "the_sum", "v2": "the_count" }, "script": "params.v1 / params.v2" } } // 同じ並びのaggsの値を使って、条件に合致したもののみを取得します。script部分で条件判定の式を指定します。 const select = { "bucket_selector": { "buckets_path": { "v1": "the_max", "v2": "the_min" }, "script": "params.v1 > params.v2 * 2" } }
*クエリ例
実際のクエリ例です。script(自前で平均(avg)を計算)とselector(グループ内の合計が件数の11倍より多いもののみ残す(=このデータ例だとフィールドaが「a10」のデータが脱落)を両方実施しています。
GET a_ex/_search { "size": 0, "aggs": { "grp": { "terms": { "field": "a.keyword" }, "aggs": { "the_sum": { "sum": { "field": "b" } }, "the_count": { "value_count": { "field": "b" } }, "custom_avg": { "bucket_script": { "buckets_path": { "var1": "the_sum", "var2": "the_count" }, "script": "params.var1 / params.var2" } }, "custom_selection":{ "bucket_selector": { "buckets_path": { "var1": "the_sum", "var2": "the_count" }, "script": "params.var1 > params.var2 * 11" } } } } } }
結果イメージ
{ "aggregations" : { "grp" : { "doc_count_error_upper_bound" : 0, "sum_other_doc_count" : 0, "buckets" : [ { "key" : "a100", "doc_count" : 5, "the_count" : { "value" : 5 }, "the_sum" : { "value" : 500.0 }, "custom_avg" : { "value" : 100.0 } }, { "key" : "a1000", "doc_count" : 5, "the_count" : { "value" : 5 }, "the_sum" : { "value" : 5000.0 }, "custom_avg" : { "value" : 1000.0 } }, { "key" : "a10000", "doc_count" : 5, "the_count" : { "value" : 5 }, "the_sum" : { "value" : 50000.0 }, "custom_avg" : { "value" : 10000.0 } }, { "key" : "a100000", "doc_count" : 5, "the_count" : { "value" : 5 }, "the_sum" : { "value" : 500000.0 }, "custom_avg" : { "value" : 100000.0 } } ] } } }
その他のPipeline aggregations
だいぶ俯瞰できた気でいましたが、他にもまだまだありますが、(心の声)解説がむずかしそうなので、別の機会に。 (movingとあるので、移動平均とかこの類の要件の次のレベルのものたちだと思います。そこをとっかかりに理解を深めると良いのではと思います。> 筆者自分自身向け)
自分メモ:それぞれの擬似シンタックス俯瞰表現↓
const avgb = { // tag::avg-bucket-agg-syntax[] "avg_bucket": { "buckets_path": "sales_per_month>sales", "gap_policy": "skip", "format": "#,##0.00;(#,##0.00)" } // end::avg-bucket-agg-syntax[] } const sd = { "serial_diff": { "buckets_path": "the_sum", "lag": 7 } } const mva = the_sum => ({ "moving_avg": { "buckets_path": the_sum, "model": "holt", "window": 5, "gap_policy": "insert_zeros", "settings": { "alpha": 0.8 } } }) const mvf = the_sum => ({ "moving_fn": { "buckets_path": the_sum, "window": 10, "script": "MovingFunctions.min(values)" } }) const mvp = the_percentile => ({ "moving_percentiles": { "buckets_path": "the_percentile", "window": 10 } }) const inference = { "inference": { "model_id": "a_model_for_inference", "inference_config": { "regression_config": { "num_top_feature_importance_values": 2 } }, "buckets_path": { "avg_cost": "avg_agg", "max_cost": "max_agg" } } }
*1:ただし、あくまで親や同列のAggs結果に作用するものです