はてだBlog(仮称)

私的なブログど真ん中のつもりでしたが、気づけばWebサイト系のアプリケーション開発周りで感じたこと寄りの自分メモなどをつれづれ述べています。2020年6月現在、Elasticsearch、pandas、CMSなどに関する話題が多めです。...ですが、だんだんとより私的なプログラムのスニペット置き場になりつつあります。ブログで述べている内容は所属組織で販売している製品などに関するものではなく、また所属する組織の見解を代表するものではありません。

検索エンジン Elasticsearchのスコアリングの頑張り方についてmy講釈(というか雑ポエム)

はじめに

itdepends.hateblo.jp

上記でfunction_scoreのお試しをしたこともあり、Elasticsearchのスコアリングについて講釈をたれてみたくなったので、記事にしました。

まえおき等

確認したのは、Elasticsearch6.4です。ただし、特にクエリ例などは出てきません。また、うっすら想定しているのは「レストラン検索サイト」的なものです。

下記の講釈は、他の例でもあてはまるものはあると思いますが、逆にちょっと違うかもと思ったあなたはおそらくそれが正しいと思います。Good Questionです。

また、ググってこの記事に流れてきた方には申し訳有りませんが、クエリDSLのワードはチラチラ出てくるもの、見てのとおりあくまで単語のみです。ひょっとすると前の記事の方がお求めのものかもしれませんので、もしよかったら上記リンクの方をごらんください。

スコアリング講釈

さて、その講釈なのですが、いきなりの画像貼り付けです。そして絵の割に字が多い。

だったらせめてslideshareでもというところがそうでもなかったり... でも良いんです。独り言だから。

スコアリングとは

シンプルなパターンマッチのGREPや通常のSQLと異なり、検索エンジンユースケースでは自由形式のドキュメントが対象です。

検索語も、表記揺れから始まって誤記や同義語の表現違い、広義語などユーザー側もこれまた自由形式です。

また、ドキュメントの方に戻ると、検索語と同じ語が何回も出てくるドキュメントは一般に、検索語の出現数が少ないドキュメントよりはユーザーが探しているものに近いと思われますが、

そんな中、ユーザーが求めているドキュメントにより近いと思われるものについて、様々な要素・観点で採点してやり、ベターなものを返してあげましょう、というのがスコアリングです。

このあたりでキャッチの絵があっても良さそうですが無し

例えばElasticsearchのスコアリングは何ができる

例えばですが、Elasticsearchのスコアリングではこんなことができます。というのを以下にまとめてみました。

ただし、ここでは、体系的にまとめたというよりは、先のレストラン検索サイト的なものをうっすら頭に浮かべているのと、私の「講釈」「主張」に寄せて偏りがあるのでご了承ください。 (本当に必要な場合には、Elasticsearchというかluceneを源流とする検索エンジンはいろいろできると思いますが、ここでは私の主張と知っている範囲のバイアスがあることにご注意ください。)

f:id:azotar:20190504133910p:plain

オレオレ用語集(個人的には気に入っている)

なお、スコアリングに出てくる加減乗除は、全てスコアリング、加点・減点、ブースト係数、重み係数どうとでも呼べるのですが、これがくせもので、ディスカッションなどで下手に厳密に使い分けると必要以上に難しくなるのと、一方でごちゃまぜだとそれはそれで議論にならないというジレンマに遭遇します。

このコミュニケーションに関する問題についてこれといった解決方法は思い当たらないのですが、上記の図でも一部示唆しているとおり、定義無しで使えそうな用語として次の用語を勢いで使って話をするようにしています。

  1. できばえ点: TF-IDFとかBM25周辺で今だったら検索エンジンがよしなに検索してくれる採点のこと
  2. 配点: 私自身は結局、スコアリングは各要素点のバランスや全体の大小関係の納得度の高い組み合わせをひねり出すことだと考えています。その際に、他の採点が同じ場合に、ある採点条件で決勝戦になる場合に、階級Aは10点、Bは5点というようなことを議論することになるのですが、この様は、実際はひとつずつ採点をすると言うよりは全体のパイを優先度の高いものに大きく割り当てる様のようなイメージを持っているので、「割り当て・配っている感」をイメージしやすい「配点」という言葉を使うことにしています。
  3. ボーナス: 基本の検索条件そのものではなく、基本の検索条件にヒットした前提のもと、その文書が特定の属性や条件に該当した場合に、追加で加点される類のものを言う。
  4. コンボ: 複数の採点基準を複雑に噛み合わせてさらに加点するなどの例。個人的には格ゲーで使われるコンボをイメージ。(ちなみに、「コンボ」はあまりやらない方が良いというのが私の主張です。)
  5. 持ち点:(少なくとも)検索エンジニアは、検索時に決まるスコアと最初から決まっているスコア(該当した時に加点されるもの)の概念はしっかり分けて考えるべきだと思っています。後者は「特権」となることも多くその雰囲気が伝わるではと思い「持ち点」と呼ぶことにしています。前者も良い呼び名があればと思いますが、今のところしっくりくるものを思いつかず。

それぞれ、もっとキャッチーなワードがあればどなたか教えてください。

スコアリング要件のあるある例とアプローチ

f:id:azotar:20190504133921p:plain

f:id:azotar:20190504133929p:plain

まとめ

ということで(特に定義はしていませんのでアレですが)レストラン検索サイトを検討しているとして、私がスコアリングの検討の際にこんなやり方をしたら良いと思っているポイント絞って再度まとめます。

◆私の思うスコアリングとは...のまとめの表

f:id:azotar:20190504133934p:plain

さいごに

今後時間があれば、上記の講釈にしたがってプログラムやクエリで事例風のものを提示して(逆につっこみを受けて)みたい気持ちがあることを述べて、ひとまずこの記事は終わりとします。

ElasticsearchのPythonクライアントでPandasを使って手軽にANALYZERの有効PoCをやってみるアドホックツール(の習作)

はじめに

Elasticsearch(6.4)、PythonおよびPandas関連のやってみた系の記事です。あと、ElasticsearchのPythonクライアントを使ってみたという内容も含んでいます。ElasticsearchのANALYZERの話もあまり詳しくは解説していませんが、うっすらそれとなく含まれます。

ただし、Elasticsearchの検索そのものの話かというとちょいと違います。

...ので、そこは次項・次々項に示したストーリーをごらんください。

この記事での演習の背景イメージ

RDBSQLのLIKE検索で頑張っていたところをElasticsearchなど検索エンジンにおきかえるという案件があるとします。

このような場合、RDBで頑張るために、データ中の特定のワードについて辞書を用いたり一律半角全角変換などを行い、LIKEを全文検索に曖昧検索風にヒットするようにその環境なりの工夫をしているということもあると思います。

きっと、置き換え用の対応表なども管理されているでしょう。また致し方ないのかもしれませんが、そのような対応表は数万行にも及ぶという例もあるでしょう。

このような対応表のメンテが限界になり検索エンジンを使うことで対応表のメンテナンスを最小限にしたいという現場もあるかもしれません。

演習ストーリーでのゴール

ここでは、この置き換え用の対応表のうち、どれとどれがElasticsearchでとあるANALYZERを行うことで不要になり、逆にANALYZER設定をその方針で進めるとした場合に、個別の辞書として引き続きメンテした方が良いものかというのを炙りだすことを考えます。

【インプット】

行番号 text0 text1
0 ニュース ニュース
1 ビッグサイティング ビッグサイティング
2 ルヴァヌ ルヴァヌ
3 ヴァニラ バニラ
4 ヴァニラ ヴァニラ
5 バニラハウス レバニラハウス

※text1がこのワードが入力されたら、text0とみなして検索して欲しいというものの例。 バニラで検索したらヴァニラが当たって欲しいよね...ということになります。まあ、この例は今回の設定の範囲では「残念ながらヒットしない」例になるのですが...

※ここでは、どっちが新でどっちが旧か、またそもそも新旧なのかという話はありますが、便宜上、上記の対応表を新旧対応表と呼ぶことにします。

... のような、置き換え用の新旧対応表をインプットに、今後あるANALYZER設定を行ったElasticsearchでの検索に切り替えた以降も引き続き同義語の辞書として残す対象をあぶり出します。

ANALYZER設定にicu系を設定するストーリーだとすると、この例の場合は、次の3と5が引き続き辞書として残す必要がある設定で、他はElasticsearch(とANALYZERの頑張り)により今後はメンテ不要となる想定です。

【期待するアウトプット】

行番号 text0 text1
3 ヴァニラ バニラ
5 バニラハウス レバニラハウス

アプローチ

次のような考え方でアプローチします。

  1. 上記のINPUT相当の対応表CSVをDataFrameに読み込む。手動1件ずつ確認するのは現実的でない件数が対象となる。
  2. DataFrameの要素(対応表にある新旧ワード)に対して繰り返し(ただしapplymapによる宣言的な適用になるため見た目上forループ記述は無し)、ANALYZERをかけて戻り値の「token」のみ抜き出した配列を確保する。
  3. 得られた新旧の「token」の配列を比較して、同じようなANALYZE結果が得られるなら、類義語辞書の個別対応必要無し...とみなせるというやり方で対象を炙り出せるのではないか。

何もないところからやるにはいろいろ段取りが必要ですが、PandasとElasticsearchの公式Pythonクライアントがあれば、これらの公式Rの見よう見まねレベルで組み合わせて味付けすることでやりたいことができそうです。

その他この演習の前提

一応、この演習で前提とした「検索要件」の背景を補足しておきます。読者の方のお立場によっては自明と思われる内容を記載している面もあるので、先にプログラム例を見てから戻ってきていただいて構わないような話です。

  1. AND検索を前提としています。例えば、国際展示場と入力されたら、「国際」かつ「展示場」を共に含むようなデータを検索したいという既存要件を従来どおりカバーできるかというものです。*1
  2. ANALYZERを使う...というストーリーのとおり、match系のクエリでの検索をイメージしています。
  3. ANALYZERは後述のPythonのコード例のコメントに書いてある設定のとおりとします。 また、Elasticsearchのsynonym設定は行なっていない体です*2。→なのでANALYZERをもっと頑張ったり他の工夫次第では、もっと良い結果を得られる伸び代があるという類のものです。

Pythonのプログラム例

上記のPythonのプログラム例です。

from elasticsearch import Elasticsearch, helpers
from elasticsearch.client import IndicesClient
import pandas as pd
from pandas.io.json import json_normalize
import sys


#初期化
es = Elasticsearch(host='localhost', port=9200)
INDEX = "es_index2"
#https://elasticsearch-py.readthedocs.io/en/master/api.html#elasticsearch.client.IndicesClient
ic = IndicesClient(es)
"""
# es_index2は例えば次のようにアナライザー設定されているとします。
# 今回は、my_ja_default_analyzerの効き具合を確認します。

PUT es_index2
{
  "settings": {
    "analysis": {
      "tokenizer": {
        "my_kuromoji_tokenizer":{ "type": "kuromoji_tokenizer", "mode": "search" },
        "my_ngram_tokenizer":{ "type": "ngram","min_gram":2,"max_gram":3, "token_chars":["letter","digit" ]  }
      },
      "analyzer": {
        "my_ja_default_analyzer": { 
          "type": "custom", "tokenizer": "my_kuromoji_tokenizer",
          "char_filter": ["icu_normalizer","kuromoji_iteration_mark","html_strip" ],
      "filter": [ "kuromoji_baseform", "kuromoji_part_of_speech", "ja_stop", "lowercase", "kuromoji_number", "kuromoji_stemmer" ]
        },
        "my_kuromoji_readingform_analyzer": {
          "type": "custom", "tokenizer": "my_kuromoji_tokenizer",
          "char_filter": [ "icu_normalizer","kuromoji_iteration_mark","html_strip" ],
          "filter": [ "kuromoji_readingform", "kuromoji_part_of_speech", "ja_stop", "lowercase", "kuromoji_stemmer" ]
        },
        "my_ngram_analyzer":{ 
          "type":"custom", "tokenizer":"my_ngram_tokenizer",
          "char_filter": ["icu_normalizer","html_strip"], "filter": [ ]
        }
      }
    }
  },
  "mappings": {
    "_doc": {
            "properties":{
        "location":{"type":"geo_point"}
      },
      
      "dynamic_templates": [
        {
          "hybrid_style_for_string": {
            "match_mapping_type": "string",
            "mapping": {
              "analyzer": "my_ja_default_analyzer",
              "fielddata": true, "store": true,
              "fields": {
            "readingform":{ "type":"text", "analyzer":"my_kuromoji_readingform_analyzer" },
            "ngram":{ "type":"text","analyzer":"my_ngram_analyzer" },
        "raw": { "type":"keyword" }
              }
            }
          }
        }
      ]
    }
  }
}

"""

COLUMN_LABEL = ["text0","text1"]

def 新旧比較ワード全件読み込み():
    #実際はCSVからそれなりの件数を読み込みする想定
    text0 = "ニュース,ビッグサイティング,ルヴァヌ,ヴァニラ,ヴァニラ,バニラハウス".split(",")
    text1 = "ニュース,ビッグサイティング,ルヴァヌ,バニラ,ヴァニラ,レバニラハウス".split(",")
    df = pd.DataFrame(data=[text0,text1],index=COLUMN_LABEL).T
    print(df,file=sys.stderr)
    return df


df = 新旧比較ワード全件読み込み()

# ランチャー
def ElasticsearchAnalyzerランチャー(ic,analyzer,text,explain=False):
    body = {"analyzer":analyzer,
                "text":text}
    if (explain == True):
        body["explain"] = "true"
    return ic.analyze(index=INDEX, body=body)

# ANALYZERの戻り値から値を抜き出すためのツール
def dict配列のフラット配列変換(dictlist,func=lambda x: x["token"] ):
    """
    [{f1:"aaa",f2:"bbb"},{f1:"ccc",f2:"ddd"},..]のような配列を
    ["aaa","ccc",...]のようなオブジェクトに変換する
    ※実際はfuncで引き渡す関数次第となる。funcのデフォルトは、dictの中に「token」というプロパティが必ず含まれる前提の例。
    """
    dstlist = []
    for v in dictlist:
        dstlist.append(func(v))
    return dstlist        

def Esアナライズ適用(x):
    analyzer = "my_ja_default_analyzer" #CONFIG
    explain = False
    text = x 
    x = ElasticsearchAnalyzerランチャー(ic,analyzer,text,explain)
    xtokens = x["tokens"]
    # 今回は途中経過は標準エラーに出力
    json_normalize(xtokens).to_csv(sys.stderr,sep='\t')
    return dict配列のフラット配列変換(xtokens)

# 一括アナライズ
dfalz = df.applymap(Esアナライズ適用)
#新旧の比較ワードそれぞれでElasticsearchのアナライズで得られた「tokens」の集合を用いた比較
#  ポイント:添え字1のアナライズ結果のtokensが、添え字1のアナライズ結果のtokensにすべて含まれていればOKとみなす。つまりこの例だと、dfalz["chk"]の値が0の場合OKという扱いになる。
dfalz["chk"] = dfalz.apply(lambda s: len(list(set(s[COLUMN_LABEL[1]])-set(s[COLUMN_LABEL[0]]))) ,axis=1) 
# 結果レポート
filterrows = dfalz["chk"] > 0
# dfalz["chk"] が1以上、つまり検索時に同値と見なされないワードが含まれているものを出力
print(dfalz[filterrows],file=sys.stderr) 
print(df[filterrows])


実行結果イメージ

上記を保存して、コマンドラインなどで起動すると次のような結果が出力されます。

      text0      text1  chk
3      ヴァニラ      バニラ    1
5   バニラ, ハウス    レバニラハウス    1
    text0    text1
3    ヴァニラ      バニラ
5  バニラハウス  レバニラハウス

似たようなものを生のPythonの標準モジュールだけ、あるいはhttpを扱いやすいのでNode.jsなどJavaScript系で頑張るという場合に比べて、公式クライアント、Pandasがそろうことで細かいところをショートカットして今回の目的で言えば、取り回しの良いツールになった気がします。どうでしょうか。

この組み合わせは自分にとってはなかなか便利だったのでご紹介したく記事にしてみました。

免責事項というかご注意事項

ElasticsearchやPandasをありあわせで短めの記事としてまとめていますので、例えば上記のサンプルコードですが、Pythonicな例にはなっていないと思います。エラー処理なども甘く、本格的なコードとしてはかけている部分がいくつかありますし、関数アノテーションや型タイプの指定をした方が良いものもあるでしょう。

また、Elasticsearchの真のパワーを引き出す類の例ではなく、本ストーリーのもとのでサンプルコードとなりますのでその点ご注意ください。

関連の「公式」系の情報源など

やってみた系の記事を書いておいていうのもなんですが、やはり「公式」をあたってみるのが一番近道です。

ということで公式R関連の記事をリンクしておきます。

elasticsearch-py.readthedocs.io

pandas.pydata.org

www.elastic.co

また、ElasticsearchのANALYZERの「公式」関連というところで、Elastic日本の中の人が作成されている便利なプラグインがあるので、こちらをリンクさせていただきます。

discuss.elastic.co

*1:もちろん、「国際」または「展示場」の例も簡単な応用で確認できると思います。

*2:今回はむしろ類義語辞書にsynonym設定した方が良いワードをあぶり出すいとなみですので...

ElasticsearchでGEO系クエリで遊びます。ついでにfunction_scoreも勉強してみます。

Elasticsearch GEO系クエリとこの記事の概要

GEO系クエリにはいくつかありますが、さくっと試しやすいのが、 geo_bounding_boxとgeo_distanceクエリです。

前者は、検索したい四方の北西と南東の座標を検索条件にあたえて、そのBOX内の座標を持つドキュメントを検索します。

つまり、四角の検索です。

後者は、中心点の座標と距離を与えて検索します。

つまり、円形の検索です。

ここでは、前者を中心に試してみました。

また、距離による並べ替えなど、function_scoreなどスコアリング、加えてPainless Scriptによる検索時の動的フックも少し試して見ています。

今回、私が動作確認したlasticsearchのバージョンは6.4です。

www.elastic.co

ここでは紹介しませんが、ポリゴンで検索できたりといったこともできます。 また、geo_shapeクエリでは、「Spatial Relations/ Spatial Strategies」というオプションがあり、INTERSECTS、DISJOINT、WITHIN、CONTAINSの切り替えができるようです。 これらはつまるところ、なんらかの「検索条件の枠」と検索対象のドキュメント側の「地域の枠」どおしが、含まれるか、重なっているか...などによってマッチするかどうかの扱いをコントロールできるようです。 私は試していませんが、そういうユースケースもしばしばあると思いますし、SQLでは実現しにくい検索条件なので非常におもしろいですよね。((ちょっとパフォーマンスが気になりますね))

検索データの登録

とりあえずkibanaのDevToolsなどで、「loc」フィールドが geo_pointタイプになるように設定

PUT ginza
{
    "mappings": {
        "_doc": {
            "properties": {
                "loc": {"type": "geo_point"}
            }
        }
    }
}

東京メトロ銀座線のデータをバルクロードする。 *1

あと、銀座線はこちら。

www.tokyometro.jp

銀座線は渋谷を起点に、おおよそ西から東に向けて走っています。 ↓ https://www.google.co.jp/maps/search/%E9%8A%80%E5%BA%A7%E7%B7%9A/@35.6106894,139.6903769,12z/data=!3m1!4b1

今回用いるデータのバルクロードの例

POST ginza/_doc/_bulk
{"index":{"_id":1}}
{"id":1,"name":"渋谷","loc":[139.701238,35.658871], "x":5,"y":3 }
{"index":{"_id":2}}
{"id":2,"name":"表参道","loc":[139.712314,35.665247], "x":2 }
{"index":{"_id":3}}
{"id":3,"name":"外苑前","loc":[139.717857,35.670527], "x":0 }
{"index":{"_id":4}}
{"id":4,"name":"青山一丁目","loc":[139.724159,35.672765], "x":2 }
{"index":{"_id":5}}
{"id":5,"name":"赤坂見附","loc":[139.737047,35.677021], "x":4 }
{"index":{"_id":6}}
{"id":6,"name":"溜池山王","loc":[139.741419,35.673621], "x":3 }
{"index":{"_id":7}}
{"id":7,"name":"虎ノ門","loc":[139.749832,35.670236], "x":0 }
{"index":{"_id":8}}
{"id":8,"name":"新橋","loc":[139.758432,35.667434], "x":3, "y":2 }
{"index":{"_id":9}}
{"id":9,"name":"銀座","loc":[139.763965,35.671989], "x":2 }
{"index":{"_id":10}}
{"id":10,"name":"京橋","loc":[139.770126,35.676856], "x":0 }
{"index":{"_id":11}}
{"id":11,"name":"日本橋","loc":[139.773516,35.682078], "x":2 }
{"index":{"_id":12}}
{"id":12,"name":"三越前","loc":[139.773594,35.687101], "x":2,"y":1 }
{"index":{"_id":13}}
{"id":13,"name":"神田","loc":[139.770899,35.693587], "x":0, "y":1 }
{"index":{"_id":14}}
{"id":14,"name":"末広町","loc":[139.771713,35.702972], "x":0 }
{"index":{"_id":15}}
{"id":15,"name":"上野広小路","loc":[139.772877,35.70768], "x":2 }
{"index":{"_id":16}}
{"id":16,"name":"上野","loc":[139.777122,35.711482], "x":3,"y":2 }
{"index":{"_id":17}}
{"id":17,"name":"稲荷町","loc":[139.782593,35.711273], "x":0 }
{"index":{"_id":18}}
{"id":18,"name":"田原町","loc":[139.790897,35.709897], "x":0 }
{"index":{"_id":19}}
{"id":19,"name":"浅草","loc":[139.797592,35.710733], "x":2, "y":1 }

geo_bounding_boxによるGEO検索

前セクションでインデックスに登録した銀座線の全部の駅を含むようなBOXの頂点ですが、

北西

"35.711482,139.701238"

南東

"35.658871,139.797592"

となりますね。

なので、ちょっとだけ余裕を持った

北西 35.712,139.7 南東 35.657,139.798

の範囲を検索してみます。

POST ginza/_doc/_search
{
  "query": {
    "bool": {
      "filter": [
        {
          "geo_bounding_box": {
            "loc": {
              "top_left": "35.712,139.7",
              "bottom_right": "35.657,139.798"
            }
          }
        }
      ]
    }
  }
}

↓ 

登録した19件が返ってきました。
(ちなみに、全てscoreは0)

「_geo_distance」プロパティを使った中心点からのソート

続いて、渋谷駅の座標を中心点にして、ソートします。

つまり、目論見どおりであれば、銀座線の始発から終点の浅草駅まで路線の並び順にソートされるハズ。

POST ginza/_doc/_search
{
  "size":100,
  "query": { "match_all" : {}},
    "sort": [
    {
      "_geo_distance": {
        "loc": "35.658871,139.701238",
        "order": "asc"
      }
    }
  ]
}  

↓ 戻り値イメージ

{
  "hits": {
    "hits": [
      {
        "_source": {
          "id": 1,
          "name": "渋谷"
        }
      },
      {
        "_source": {
          "id": 2,
          "name": "表参道"
        }
      },
      {
        "_source": {
          "id": 3,
          "name": "外苑前"
        }
      },
      {
        "_source": {
          "id": 4,
          "name": "青山一丁目"
        }
      },
      {
        "_source": {
          "id": 5,
          "name": "赤坂見附"
        }
      },
      {
        "_source": {
          "id": 6,
          "name": "溜池山王"
        }
      },
      {
        "_source": {
          "id": 7,
          "name": "虎ノ門"
        }
      },
      {
        "_source": {
          "id": 8,
          "name": "新橋"
        }
      },
      {
        "_source": {
          "id": 9,
          "name": "銀座"
        }
      },
      {
        "_source": {
          "id": 10,
          "name": "京橋"
        }
      },
      {
        "_source": {
          "id": 11,
          "name": "日本橋"
        }
      },
      {
        "_source": {
          "id": 12,
          "name": "三越前"
        }
      },

ちなみに、応答の「"sort"」の中に、メートル距離らしきものが入ってきています(ので、これを距離と見てもよさそうです)。

あと、本格的にいろいろ考える場合は、sortのところについて次の公式Rの「ignore_unmapped」のあたりの解説も確認しておきましょう。

https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-sort.html#geo-sorting

また、そもそもこの公式Rの続きのところのscript_based_sortingのところにあるように、Painless Scriptでソートの条件を指定できます。 話の流れの都合でここでは述べませんが、後述のPainless Scriptの例などを使うと、「_geo_distance」をもっとひねったカスタムな条件のソートもできそうですね。

前2つを、組み合わせてみます。

POST ginza/_doc/_search
{
  "size":100,
  "query": {
    "bool": {
      "filter": [
        {
          "geo_bounding_box": {
            "loc": {
              "top_left": "35.712,139.7",
              "bottom_right": "35.657,139.798"
            }
          }
        }
      ]
    }
  },
  "sort": [
    {
      "_geo_distance": {
        "loc": "35.658871,139.701238",
        "order": "asc"
      }
    }
  ]
}

↓
結果は略

距離の取得(script_fields、arcDistance、planeDistanceの利用)

距離によるソートはしないものの、検索の中心点からの距離が欲しい、というストーリーで、これらを個別に取得してみます。

POST ginza/_doc/_search
{
  "size":100,
  "query": {
    "bool": {
            "must": [
          {"match_all":{}}
      ]
    }
  },
  "sort": [
    "id"
  ],
  "script_fields": {
     "a_dist": {
         "script": {
             "lang": "painless",
             "source": "doc['loc'].arcDistance(35.658871,139.701238)"
         }},
      "p_dist": {
         "script": {
             "lang": "painless",
             "source": "doc['loc'].planeDistance(35.658871,139.701238)"
         }}
 },
 "_source":["name","loc"]
}

なお、arcDistance、planeDistanceというのは、painlessのGEO系のAPIです。

名前からなんとなくお察しします(手抜き)

https://www.elastic.co/guide/en/elasticsearch/painless/7.0/painless-api-reference.html

上記クエリの例だと、a_dist、p_distというフィールドに距離が取得できます。

        "_source": {
          "loc": [
            139.701238,
            35.658871
          ],
          "name": "渋谷"
        },
        "fields": {
          "p_dist": [
            0.001059014854358351
          ],
          "a_dist": [
            0
          ]
        },
        "sort": [
          1
        ]
      },
      {
        "_index": "ginza",
        "_type": "_doc",
        "_id": "2",
        "_score": null,
        "_source": {
          "loc": [
            139.712314,
            35.665247
          ],
          "name": "表参道"
        },
        "fields": {
          "p_dist": [
            1226.3385782660855
          ],
          "a_dist": [
            1226.3385739214464
          ]
        },

渋谷が0、表参道が1.2kmぐらいというところなのであっているのかな。

painlessで距離取得の補足事項

ここで次のドキュメントをインデックスに追加します。

座標情報無しですね。....

POST ginza/_doc/_bulk
{"index":{"_id":20}}
{"id":20,"name":"惑星プロメシューム"}

上記のデータを追加したのち、先ほどと同様に、(全件取得しつつ)取得できた駅への渋谷駅からの距離を取得します。

POST ginza/_doc/_search
{
  "size":100,
  "query": {
    "bool": {
            "must": [
          {"match_all":{}}
      ]
    }
  },
  "sort": [
    "id"
  ],
  "script_fields": {
     "a_dist": {
         "script": {
             "lang": "painless",
             "source": "doc['loc'].arcDistance(35.658871,139.701238)"
         }},
      "p_dist": {
         "script": {
             "lang": "painless",
             "source": "doc['loc'].planeDistance(35.658871,139.701238)"
         }}
 },
 "_source":["name","loc"]
}

↓

一応クエリは返ってきますが、次のfailuresを含みます。

    "failures": [
      {
        "shard": 2,
        "index": "ginza",
        "node": "NHxID4CmSDa21PyG_F0kHg",
        "reason": {
          "type": "script_exception",
          "reason": "runtime error",
          "script_stack": [
            "org.elasticsearch.index.fielddata.ScriptDocValues$GeoPoints.arcDistance(ScriptDocValues.java:501)",
            "doc['loc'].arcDistance(35.658871,139.701238)",
            "          ^---- HERE"
          ],
          "script": "doc['loc'].arcDistance(35.658871,139.701238)",
          "lang": "painless",
          "caused_by": {
            "type": "null_pointer_exception",
            "reason": null
          }
        }
      }
    ]

つまるところ、座標が無い、先ほど追加したid=20のレコードが処理対象になってしまうみたいですね。

では、painless scriptでnull判定を行ってエラーにならないようにしましょう。

POST ginza/_doc/_search
{
  "size":100,
  "query": {
    "bool": {
      "must": [
          {"match_all":{}}
      ]
    }
  },
  "sort": [
    "id"
  ],
  "script_fields": {
     "a_dist": {
         "script": {
             "lang": "painless",
             "source": "if(doc['loc'].value != null ){ return doc['loc'].arcDistance(35.658871,139.701238) } else{ return null }"
         }}
 },
 "_source":["name","loc"]
}

↓

結果は略するが、エラーは解消。

なお、公式だと、この類のものはsize()が0かどうか判定するという例が記載されています。

https://www.elastic.co/guide/en/elasticsearch/painless/7.0/painless-examples.html#_missing_values

渋谷駅から近い順に加点する

function_scoreおよびそれの実際の設定であるfunctionsを使って加点の指定を行います。

なお、元情報が数値や緯度経度、座標に関しては、いずれも中心点にあたるものからの「距離」が遠ざかるほど、減点されるというスコアリングが有用です。

また、Elasticsearchのクエリとしても、実際に常套句として「減衰関数」のfunctionクエリが用意されています。

下記の例では、「gauss」という減衰関数(decay function)を使っています。

多分ガウス関数などで見られるS字の曲線そのものだと思うのですが、中心点にある程度近いものは減点がなだらか、ある値周辺を境にしばらく急な傾きで減点が続いて、ある値周辺を境に再び減点がなだらかになります。

つまるところ、例えば

徒歩で歩ける範囲は、

3分の距離と5分の距離はそれほど違いがないと感じるものの

10分を超えると少しめんどくささい気持ちが加速して、

逆に30分以上は、歩くつもりはない(ので興味がない... もしくはそれでも向かわなければならないモチベーションがあるなら、車で移動するのでその場合は徒歩40分も60分も車であれば大差ない...)

という感ににそっていると思います。

減衰関数は、他にもlinearやexpがありますが、私見ではgaussが一番、上記のユースケースにそっているような気がします。

詳しくは、公式の

Function score query | Elasticsearch Reference [7.6] | Elastic

のセクションの最後の方にある、「decay」「scale」「offset」「origin」の説明も兼ねた関数のグラフの画像がわかりやすいと思います。

で、その減衰関数を使った距離加点(中心点から離れると減点)の例です。

POST ginza/_doc/_search
{
  "size": 100,
  "query": {
    "function_score": {
      "query": {
        "bool": {
          "must": [
            {
              "match_all": {}
            }
          ],
          "filter": [
            {
              "geo_bounding_box": {
                "loc": {
                  "top_left": "35.712,139.7",
                  "bottom_right": "35.657,139.798"
                }
              }
            }
          ]
        }
      },
      "boost": "1",
      "boost_mode": "multiply",
      "score_mode": "sum",
      "functions": [
        {
          "gauss": {
            "loc": {
              "origin": "35.658871,139.701238",
              "offset": "0km",
              "scale": "5km",
              "decay": 0.01
            }
          },
          "weight": 1
        }
      ]
    }
  },
  "script_fields": {
    "a_dist": {
      "script": {
        "lang": "painless",
        "source": "doc['loc'].arcDistance(35.658871,139.701238)"
      }
    }
  },
  "_source":["name"]
}

※ 2019/4/26 
    誤記で
       decay:0
 となっているところがあったので、0.01に見直しました。

↓ 結果

{
  "hits": {
    "hits": [
      {
        "_score": 1,
        "fields": {
          "a_dist": [
            0.001059014854358351
          ]
        }
      },
      {
        "_score": 0.7549775,
        "fields": {
          "a_dist": [
            1226.3385782660855
          ]
        }
      },
      {
        "_score": 0.60371536,
        "fields": {
          "a_dist": [
            1983.4067075198245
          ]
        }
      },
      {
        "_score": 0.48381922,
        "fields": {
          "a_dist": [
            2583.4874809390394
          ]
        }
      },
      {
        "_score": 0.23820671,
        "fields": {
          "a_dist": [
            3812.779265059126
          ]
        }
      },
      {
        "_score": 0.20415527,
        "fields": {
          "a_dist": [
            3983.2068896046344
          ]
        }
      },
      {
        "_score": 0.08726448,
        "fields": {
          "a_dist": [
            4568.245932624003
          ]
        }
      },
      {
        "_score": 0,
        "fields": {
          "a_dist": [
            8035.270236780703
          ]
        }
      },
      {
        "_score": 0,
        "fields": {
          "a_dist": [
            10439.71264168335
          ]
        }
      },
      {
        "_score": 0,
        "fields": {
          "a_dist": [
            5253.984620891842
          ]
        }
      },
     ..........


なお、script_fieldsの「a_dist」は、ここでは加点とは関係ありません。

これがあると検索結果に距離が含まれるので、実際の「減点傾向」がよくわかるためです。

また、boost、boots_mode、score_modeは、(説明がめんどくさいので)ここではおまじないだと思ってください。 (本バージョンのElasticsearchだとおそらくデフォルト設定と同じなので指定しなくても同じ動きになります。)

ここでは、geo検索まわりの説明に絞っているのでそれほど意味が感じられませんが、他の加点要素との組み合わせのパラメータになります。

これらのパラメータについては、詳しくは、次のページの最初のセクションをごらんください。

www.elastic.co

さて、再びGEO検索目線み戻って、追加の解説をします。

減衰関数(ここではgauss)のところにoffset、scale、decayというサブプロパティを設定しています。

これらは、expやlinearでも共通設定項目です。

gauss、exp、linearそれぞれの性質によりますが、いずれも用いたとしても、このoffset、scale、decayのような値を設定した場合、中心点は1点として、5kmぐらいまで減点して、5km離れたあたり以降は全て0点とするようなイメージの設定です。

あえて数式のことを考えなければ、ただし、先述リンク先のグラフの画像のイメージが頭にあればという前提ですが、「decay」は0点にするつもりの0を設定するイメージで0よりちょっと大きい値を設定するという間に合わせ理解でも「使うだけなら」問題ないかと思います。*2

function_scoreによるgeo系検索でぼちぼちあるあるなスコアリング設定の例

冒頭にインデックスにバルクロードしたデータに「x」と「y」というプロパティがありました。

これは、その駅で乗り換えられる路線数です。「x」がJR・私鉄の合計、「y」がJRの乗り換え可能路線数です。

この値から連想して次のような加点を考えてみます。

  1. 乗り換えられる路線が多い駅は加点する。
  2. JRに乗り換えられる駅はさらに加点する。ここで、JRに乗り換えられない駅は加点しない。
  3. 乗り換えられる路線が3つ以上の駅は、さらにボーナス加点する。

および、そもそも前述の例と同様に渋谷駅から近い方の駅を減衰関数で加点する。

それぞれ、script_score、field_value_factor、(function_scoreの)filterという仕掛けを使います。 (script_scoreはちょいと手抜きです。ポイントはscoring コンテキストでの、painless scriptが使えるというところです。)

つまるところ、渋谷駅から近い方が嬉しいが、近い駅で乗り換え選択肢が少ないよりは、乗り換えの選択が多い駅を優先(ただしあんまり遠いのは嫌)のような例になります。

↓実際のクエリは下記のとおり。

POST ginza/_doc/_search?filter_path=*.*._s*,*.*.f*
{
  "size": 100,
  "query": {
    "function_score": {
      "query": {
        "bool": {
          "must": [
            {
              "match_all": {}
            }
          ],
          "filter": [
            {
              "geo_bounding_box": {
                "loc": {
                  "top_left": "35.712,139.7",
                  "bottom_right": "35.657,139.798"
                }
              }
            }
          ]
        }
      },
      "boost": "1",
      "boost_mode": "multiply",
      "score_mode": "sum",
      "functions": [
        {
          "gauss": {
            "loc": {
              "origin": "35.658871,139.701238",
              "offset": "0km",
              "scale": "5km",
              "decay": 0.001
            }
          },
          "weight": 1
        },
        {
          "script_score": {
            "script": {
              "source": "doc['x'].value"
            }
          }
        },
        {
          "field_value_factor": {
            "field": "y",
            "factor": 0.8,
            "missing": 0
          }
        },
        {
           "filter": { "range": { "x": { "gte": 3 } }},
            "weight": 2
        }
      ]
    }
  },
  "script_fields": {
    "a_dist": {
      "script": {
        "lang": "painless",
        "source": "doc['loc'].arcDistance(35.658871,139.701238)"
      }
    }
  },
  "_source": [
    "name"
  ]
}


↓ 結果: 新橋や上野など節目の駅が加点によりせり上がってきている(...と言えると思う)


{
  "hits": {
    "hits": [
      {
        "_score": 10.4,
        "_source": {
          "name": "渋谷"
        },
        "fields": {
          "a_dist": [
            0
          ]
        }
      },
      {
        "_score": 6.6004868,
        "_source": {
          "name": "新橋"
        },
        "fields": {
          "a_dist": [
            5253.984537158694
          ]
        }
      },
      {
        "_score": 6.6,
        "_source": {
          "name": "上野"
        },
        "fields": {
          "a_dist": [
            9010.821109908235
          ]
        }
      },
      {
        "_score": 6.0180106,
        "_source": {
          "name": "赤坂見附"
        },
        "fields": {
          "a_dist": [
            3812.779221045536
          ]
        }
      },
      {
        "_score": 5.0124764,
        "_source": {
          "name": "溜池山王"
        },
        "fields": {
          "a_dist": [
            3983.2068434617145
          ]
        }
      },
      {
        "_score": 2.8000004,
        "_source": {
          "name": "三越前"
        },
        "fields": {
          "a_dist": [
            7250.651361098846
          ]
        }
      },
      {
        "_score": 2.8,
        "_source": {
          "name": "浅草"
        },
        "fields": {
          "a_dist": [
            10439.711719992803
          ]
        }
      },
      {
        "_score": 2.6599808,
        "_source": {
          "name": "表参道"
        },
        "fields": {
          "a_dist": [
            1226.3385739214464
          ]
        }
      },
      {
        "_score": 2.1581507,
        "_source": {
          "name": "青山一丁目"
        },
        "fields": {
          "a_dist": [
            2583.487466760376
          ]
        }
      },

以上で終わりです

あとで、Painless Scriptのその他読みこなしておくと便利な公式リファレンスへのポインタを自分用に貼り付ける予定。

*1:駅データ.jpのフリー版データの銀座線の駅一覧を利用させてもらいました

*2:つまり私のことか!

駅データ.jpをサンプルに使ったElasticsearchのGeo検索のクエリ例、事前準備のPandas、Pyprojでのデータ加工、Pythonクライアントでのバルクロード、Pythonクライアントでの検索、

はじめに

駅データ.jpという駅の路線データおよび緯度経度の座標を管理してありフリーでも利用可能な*1データが提供されています。

www.ekidata.jp

今回駅データ.jpのデータについて、次のチュートリアル(ひとまず動くサンプルコードを動かしてみるの意)として良さそうな例だったので、無料データをインプットにサンプルコードをまとめてました。

  • ElasticsearchのGeo検索(のクエリの形式)
  • Elasticsearchの公式Pythonクライアントライブラリを使った、バルクロード、検索の例
  • Pandasのデータ変換
  • Pythonの緯度経度を扱う有名ライブラリであるPyprojで2点間の距離を求めてみる

サンプルコード

1. 駅データ.jpデータをJSONに変換しやすいCSVに編集

元ネタは正規化されたCSVデータですので、これを結合します。 (実際のところ、今回の範囲のElasticsearchを使ってみるの範囲では結合は不要です。) 距離を求めているところは、今回のElasticsearchでの検索とは直接関係ありません。 Pyprojを使った例をまぜこみたかったからです。

import pandas as pd
import numpy as np
import sys
import csv
import copy
import pyproj

sta_df = pd.read_csv('eki/station.csv')
pref_df = pd.read_csv('eki/pref.csv')
line_df = pd.read_csv('eki/line.csv')
join_df = pd.read_csv('eki/join.csv')

# join.csvは、終点用のエントリ(次の駅のコード)が存在しない仕様になっている。
# 今回、終点の駅も「次の駅までの距離」を求めたいため、終点の駅用にに対し終点の駅のひとつ前の駅を次の駅とみなしたエントリを追加する。
# 終点駅の一覧
terminal_stas = pd.DataFrame(list(set(join_df['station_cd2'].tolist()) -
                                  set(join_df['station_cd1'].tolist())), columns=['station_cd2'])

# 終点駅 -> 前の駅のデータを、join_dfの形式に合わせて生成
terminal_stas_join_df = pd.merge(terminal_stas, join_df[['line_cd', 'station_cd1', 'station_cd2']], on=[
    'station_cd2'], suffixes=('', '_r'), how='left')[['line_cd', 'station_cd2', 'station_cd1']].rename(columns={'station_cd2': 'station_cd1', 'station_cd1': 'station_cd2'})

# join_dfに追加
join_df = join_df.append([terminal_stas_join_df]).reset_index()


# DataFrameを作る
# 反対のエントリを作成
# 最初のjoin_dfにconcatする

sta_df = pd.merge(sta_df, pref_df, on=[
    'pref_cd'], suffixes=('', '_r'), how='left')
sta_df = pd.merge(sta_df, line_df, on=[
    'line_cd'], suffixes=('', '_r'), how='left')
sta_df = pd.merge(sta_df, join_df,
                  left_on=['station_cd'], right_on=['station_cd1'], suffixes=('', '_r'), how='left')

# join_dfから取得した、次の駅の緯度経度を取得
sta_df = pd.merge(sta_df, sta_df[['station_cd', 'lon', 'lat']],
                  left_on=['station_cd2'], right_on=['station_cd'], suffixes=('', '_next'), how='left')


# 使いそうな項目
columns = ['station_cd', 'station_name', 'line_cd', 'pref_cd', 'post', 'add', 'lon',
           'lat', 'e_status', 'e_sort', 'pref_name',
           'company_cd', 'line_name', 'line_name_k', 'line_name_h',
           'line_type', 'lon_r', 'lat_r', 'zoom', 'e_status_r',
           'e_sort', 'line_cd', 'station_cd1', 'station_cd2', 'lon_next', 'lat_next']


# 運営中の駅
df = sta_df[sta_df['e_status'] == 0]


def get_distance(series):
    if series.isnull()['lon_next']:
        return 0
    grs80 = pyproj.Geod(ellps='GRS80')  # GRS80楕円体
    x, y, distance = grs80.inv(
        series['lon'], series['lat'], series['lon_next'], series['lat_next'])
    return int(distance)


# 隣駅との距離を設定
df = df.assign(dist=lambda df: df.apply(get_distance, axis=1))

# ekijpall.csvを出力
df[['station_cd', 'station_name', 'lon', 'lat', 'pref_name',
    'line_name', 'dist']].to_csv('ekijpall.csv', sep='\t', header=True)

2. JSONを作成〜Pythonクライアントでバルクロード

バルクロードします。前のステップの出力ファイルであるekijpall.csvを読み込んでいます。 なお、コード中の次のくだりで、緯度経度をElasticsearchのgeo_point型のデータ登録に合うように配列形式に編集しています。

df = df.assign(location=df'lon', 'lat'.values.tolist())

from elasticsearch import Elasticsearch, helpers
import pandas as pd
import numpy as np
import json
from pandas.io.json import json_normalize
import copy

es = Elasticsearch(host='localhost', port=9200)
INDEX = "x-idx"

"""
kibanaのDevToolsなどで次のmappings設定をしておきましょう。
PUT x-idx
{
    "mappings": {
        "_doc": {
            "properties": {
                "location": {"type": "geo_point"}
            }
        }
    }
}
"""

# 事前に作り込んだ駅データ.jpのデータを読み込み
df = pd.read_csv('ekijpall.csv', sep='\t')  # , skiprows=lambda x: x > 1000)

df = df.assign(location=df[['lon', 'lat']].values.tolist())

# JSON Linesを作る
df_lines = df.to_json(
    force_ascii=False, orient='records', lines=True)

# バルクロードの準備
actions = []
for i in iter(df_lines.split("\n")):
    v_json = json.loads(i)
    actions.append({
        "_index": INDEX,
        "_type": "_doc",
        "_id": v_json["station_cd"],
        "_source": v_json
    })

# バルクロードする
#  https://elasticsearch-py.readthedocs.io/en/master/helpers.html?highlight=bulk
helpers.bulk(es, actions)

3. Elasticsearch Geo検索の例

記事を書くにあたりちょいと調べたことを盛り込みたかったのですが*2、余裕がないので、サンプルのサンプルに絞ったものに限定。

Pythonクライアントだと、コンストラクタを立ち上げて、そいつにsearchメソッドでクエリをかませると検索できるようです。

ここでは、geo_boundingとgeo_distanceを発行してみました。

from elasticsearch import Elasticsearch, helpers
import pandas as pd
import numpy as np
import json
from pandas.io.json import json_normalize
import copy


es = Elasticsearch(host='localhost', port=9200)
INDEX = "x-idx"

x = es.search(index=INDEX, body={"query":
                                 {"geo_bounding_box":
                                  {"location":
                                   {"top_left":
                                    {"lat": 36, "lon": 140},
                                       "bottom_right":
                                       {"lat": 32, "lon": 139.3}
                                    }
                                   }
                                  }
                                 })

print(x)

x = es.search(index=INDEX, body={"query":
                                 {"geo_distance":
                                  {
                                      "distance": "500m",
                                      "location": {
                                          "lat": 35.6983573,
                                          "lon": 139.7709256
                                      }
                                  }
                                  }
                                 })

print(x)


参考にさせていただいたサイト等のリンク

pyproj [いかたこのたこつぼ]

Helpers — Elasticsearch 6.3.1 documentation

Geo queries | Elasticsearch Reference [6.6] | Elastic

Elasticsearch 5系で距離を算出するscriptをpainlessで書く - Qiita

検索順位を自在に操る | Elastic

免責事項

一応他所様のデータを使った内容なので免責事項的なところをひとこと。

この記事は、実質PandasとElasticsearchの勉強メモです。 だれかの参考になればということで、○○風のデータがあれば、それをPandasでこんな加工をして、こんな感じで取り込めば、こういう活用ができるよねという技術メモをフリーハンドで記載したものです。 当たり前の話ですが念のため記載しておくと、本当にどこかで使う場合は、データ提供元が定める利用規約を確認してください。

また、PandasやElasticsearchの各種コード例ですが、ひとまず私自身も含めて初学者の方がきっかけをつかむための、見てのとおりの完成度です。 言うまでもなく、そのままプロダクトに取り込めるレベルではないですし、プロダクトと言わずもなんらかコピペ実行するなどの際は、ご注意ください。

*1:商用データもあります。商用・フリー問わず、実際の利用は提供元が示している権利関係や利用規約をご覧ください

*2:なんか勿体つけている感じですが、自分への将来の宿題としての意味...

「街区レベル位置参照情報」を使って市区町村を囲むような長方形の4角の緯度経度を算出する体でのPandasとLeafletのサンプルプログラム

はじめに

行政が公開している「街区レベル位置参照情報」というデータを使って、↓こんな感じで、ある市区町村を囲むような4角形の矩形をあぶりだせないか試してみました。

... という体裁をとった、PandasとLeafletのサンプルプログラムを動かしてみた・やってみた記事です。

f:id:azotar:20190326211747p:plain

nlftp.mlit.go.jp

入力データのダウンロード

上記のページから、位置参照情報ダウンロードサービスのリンクを辿って次の ページに行ってください。

位置参照情報ダウンロード

ここから、何画面か、データをダウンロードしたい地域と欲しいデータが「街区レベル」か「大字・町丁目レベル」かを選択するかをいく通りかの方法で選べる画面遷移です。

画面遷移がちょっと説明が難しいのと最後に同意を求められるので、直リンクはしません。

ただ、どのルートで選んでも最終的には、ある地域一帯のデータがダウンロードできる画面に行くようなので、そこで対象としている地域の「街区レベル」のデータをダウンロードしてください。

※ファイルはSJISです。変換コマンドなどでutf8に変換してください。ここでは、utf8に変換したものとして説明を続けます。

ダウンロードしたデータを集計

「街区レベル位置参照情報」は、その地域の全ての街区レベルの住所の代表地点の緯度経度を保持しています(いるようです)。詳しくは提供元の仕様書やダウンロード時に同梱のhtmlファイルに記述がありますのでそちらを参照ください。

ここで、このデータから、今回あぶり出したいデータ、どのようにあぶり出しするかのアイディアは次の図のようなイメージです。

f:id:azotar:20190326212713p:plain

そんなに難しい話ではないですね。

もし本気でこのデータを使うとすれば、元のデータが都合のよいものになっているかどうかが重要ですが、ここではPandasで遊んでみて、Leafletで遊んでみる目的なので、いざPandasでこのようなデータを編集してみます。

データ編集のプログラム例

import pandas as pd
import numpy as np
import sys
import csv
import copy


# 13_2017.utf8.csv はインプットデータ。ここでは東京のデータ。30MBほどなのでここでは一括で処理しましたが、少しずつためしたい場合は、read_csvのskiprowsオプションなどを利用するのが良い。
#  http: // nlftp.mlit.go.jp/isj/index.html
df = pd.read_csv('13_2017.utf8.csv',
                 low_memory=False) 


# 2つの座標の中心の座標を求める。ここではDEG形式なので多分この計算式でも良い。
def center(series):
    ss = list(map(float, list(series.sort_values())))
    return ss[0] + (ss[-1] - ss[0])/2


# 注:この指定方法はPandasの新しいバージョンではdeprecatedだが、ここでは実施内容がわかりやすいためこの指定方法
AGG_COND = {
    '緯度': {'lat_n': 'max', 'lat_c': center, 'lat_s': 'min'},
    '経度': {'lon_w': 'min',  'lon_c': center, 'lon_e': 'max'}
}

ADDRESS = ['都道府県名', '市区町村名']  # , '大字・丁目名']
GEO_LOC = ['緯度', '経度']

A_AND_G = copy.deepcopy(ADDRESS)
A_AND_G.extend(GEO_LOC)

# 今回は、市区町村レベルの囲みの長方形をあぶり出すため、ある市区町村の配下のデータでgroupbyする。
# 緯度経度の値はDEG形式だが、求めるのは最大・最小のため、ここでは数値とみなして、集計して良い。
locations = df[A_AND_G].groupby(ADDRESS).agg(AGG_COND)


def gen_latlngs(series):
    """
    DataFrameの緯度経度の集計値を「JavaScriptの配列定義の文字列」に当てはめる。
    ※この時点では意図がわかりにくいが、後のLeafletのサンプル例をアドホックひとまず動かすための仕掛け
    """
    lat_n = str(series[('緯度', 'lat_n')])
    lon_w = str(series[('経度', 'lon_w')])
    lon_e = str(series[('経度', 'lon_e')])
    lat_s = str(series[('緯度', 'lat_s')])
    lat_c = str(series[('緯度', 'lat_c')])
    lon_c = str(series[('経度', 'lon_c')])
    area = series['市区町村名'][0]
    ls = f"latlngs.push([[{lat_n},{lon_w}],[{lat_n},{lon_e}],[{lat_s},{lon_e}],[{lat_s},{lon_w}],[{lat_n},{lon_w}]]); "
    cn = f"centers.push([{lat_c},{lon_c}]);"
    ar = f"popups.push('{area}');"
    return ls + cn + ar


# 編集結果を出力
locations.reset_index().apply(gen_latlngs, axis=1).to_frame().to_csv(
    sys.stdout, index=False, quoting=csv.QUOTE_NONE, escapechar=' ')

"""
標準出力に下記のイメージのテキストがつらつら出力
 latlngs.push([[35.745558, 139.677755] , [35.745558 , 139.752208] , [35.712728000000006 , 139.752208] , [35.712728000000006 , 139.677755] , [35.745558 , 139.677755]])
 centers.push([35.72914300000001, 139.7149815])
 popups.push('○○区')
"""

Leafletで地図表示の例

前述のプログラムの出力結果をよしなに取り込んだ次のHTMLファイルを保存して、Chromeなどのモダンブラウザ(?)で開いて見てください。 (※ 実際はちょっとだけ書き換えが必要になるので、下記、HTMLファイル中の解説は確認してくださいね。)

<!DOCTYPE html>
<html>
<head>
    <meta charset="utf-8" />
    <title>Leaflet/OpenStreetMap他を使った市区町村枠表示<</title>
    <!--
    <link rel="stylesheet" href="https://unpkg.com/leaflet@1.4.0/dist/leaflet.css">
    <script src="https://unpkg.com/leaflet@1.4.0/dist/leaflet.js"></script>
    -->
    <link rel="stylesheet" href="http://cdn.leafletjs.com/leaflet/v1.4.0/leaflet.css" />
    <script src="http://cdn.leafletjs.com/leaflet/v1.4.0/leaflet.js"></script>
</head>
<body>
    <div id="mapid" style="width: 100%; height: 1200px;"></div>
</body>
<script>
    let mymap = null

    let latlngs = []
    let centers = []
    let popups = []
</script>

<!-- script src="./center_and_area_data.js"></script -->

<script>
// 前のPandasのプログラムの標準出力のテキストが、JavaScriptのスニペットなので、それをファイルにして↑のようにscript srcで読み込むか、下記↓のように貼り付け。
// ここではモダンブラウザであればHTTPサーバに配置しなくてもひとまず動かしてみる目的のものであり、もちろん、もっとちゃんとしたやり方があることには注意のこと。

 latlngs.push([[35.745558, 139.677755] , [35.745558 , 139.752208] , [35.712728000000006 , 139.752208] , [35.712728000000006 , 139.677755] , [35.745558 , 139.677755]])
 centers.push([35.72914300000001, 139.7149815])
 popups.push('○○区')

</script>

<script>
    mapInitCenter = centers[0]

    window.onload = function () {
        mymap = L.map('mapid', {
            center: centers[0],
            zoom: 13
        })
        let osm = L.tileLayer(
            'https://{s}.tile.openstreetmap.org/{z}/{x}/{y}.png',
            {
                attribution: 'Map data &copy; <a href="https://www.openstreetmap.org/">OpenStreetMap</a> contributors, <a href="https://creativecommons.org/licenses/by-sa/2.0/">CC-BY-SA</a>'
            }
        )

        let blank2 = L.tileLayer('https://cyberjapandata.gsi.go.jp/xyz/blank/{z}/{x}/{y}.png',
            { id: 'blankmap', attribution: "<a href='https://maps.gsi.go.jp/development/ichiran.html' target='_blank'>国土地理院</a>" })

        let seamlessphoto11 = L.tileLayer('https://cyberjapandata.gsi.go.jp/xyz/seamlessphoto/{z}/{x}/{y}.jpg',
            { id: 'blankmap', attribution: "<a href='https://maps.gsi.go.jp/development/ichiran.html' target='_blank'>国土地理院</a>" })

        let relief12 = L.tileLayer('https://cyberjapandata.gsi.go.jp/xyz/relief/{z}/{x}/{y}.png',
            { id: 'blankmap', attribution: "<a href='https://maps.gsi.go.jp/development/ichiran.html' target='_blank'>国土地理院</a>" })

        //osm.addTo(mymap)
        blank2.addTo(mymap)
        //seamlessphoto11.addTo(mymap)
        //relief12.addTo(mymap)

        let popupObj = []
        for (let i = 0; i < centers.length; i++) {
            // 直線の色をランダムに生成する(ことにした)
            let lineStyle = {
                "color": `rgb(${~~(256 * Math.random())}, ${~~(256 * Math.random())}, ${~~(256 * Math.random())})`,
                "weight": 5
            }
            // 直線の地図のインスタンスに描く。ここでは、四角形の4点の座標について、左上、右上、右下、左下の配列を引き渡すので、この座標を結ぶような直線が引かれることになり、長方形になる。
            L.polyline(latlngs[i], lineStyle).addTo(mymap)

            // ポップアップを表示。今回やりたいことからすると必須ではないが、Leafletの練習としてやってみたの例。
            popupObj.push(new L.Popup({ 'autoClose': true }))
            popupObj[i].setLatLng(centers[i]).setContent(popups[i])
            L.marker(centers[i]).addTo(mymap).bindPopup(popupObj[i]).openPopup()
        }

    }

</script>
</html>


参考

この記事の参考書籍↓です。 また、この号の「便利帳」という章には地理関係のオープンデータのリンク集とライセンス条件等をまとめた表が載っているので、ここで紹介されているリンクを探って行くと、この記事で扱ったような編集もそもそも不要で、加えてより精細なShapeデータなども得られるかもしれません。

Interface(インターフェース) 2019年 04 月号

免責事項

一応他所様のデータを使った内容なので免責事項的なところをひとこと。

この記事は、実質pandasとLeafletの勉強メモです。 だれかの参考になればということで、○○風のデータがあれば、それをpandasでこんな加工をして、こんな感じで取り込めば、こういう活用ができるよねという技術メモをフリーハンドで記載したものです。 当たり前の話ですが念のため記載しておくと、本当にどこかで使う場合は、データ提供元が定める利用規約を確認してください。

また、各種コード例ですが、ひとまず私自身も含めて初学者の方がきっかけをつかむための、見てのとおりの完成度です。 言うまでもなく、そのままプロダクトに取り込めるレベルではないですし、プロダクトと言わずもなんらかコピペ実行するなどの際は、ご注意ください。

Pandas(もしくはPython)のオレオレイディオム

はじめに

Elasticsearchに取り込むにはそのままではちょっとアレかなという類のデータを手間をかけずにPandasでデータ変換するにはというテーマで考え事をしてみました。

「よくある例」なのかは断言できませんが、ボキャブラリーとして手札にあれば、間に合わせには悪くないのではというもののサンプルコードの例をあげています。

なお、この記事は、次の記事の親戚記事です。

itdepends.hateblo.jp

↑ この記事は、本当にただのシンタックス一覧に過ぎないので、もう少しデータ処理っぽいことのさわりになるようなことをこの記事で補足しました。

itdepends.hateblo.jp

↑ この記事で使っているテクニック(?)を、もっと短めの本記事で少し分解してみたというものになります。

EAVデータ風のデータをJSONらしい体裁にバラす

ストーリー

SQLアンチパターンのEAVパターンをさらにこじらせて、オブジェクトの配列で保持している。 なんぼなんでもと思う面もあるが、このような複数のシステムからなるデータソースのデータを固定のデータフォーマットで永続化してレポジトリに保持しているといった場合にはないわけでもないのではと思ってストーリー設定。

→ このようなデータを、シンプルなJSONオブジェクトに変換する。

※ Pandasというよりは、dict(JSON)データのネスト階層の変換の例。   (是非は別として)この類のメタな処理が、型の扱いが緩めな言語では取り扱いしやすい。(取り扱いしやすい ≒ ひとまず動かすまでのタイプ量が少ない)

DataFrame中のJSON(dict)格納データの重複確認

この例におけるイディオム

  • PandasのDataFrameのある1列にJSON(dict)を保持している場合のグループ化・重複確認
  • ※ 処理の見栄えとしては重複確認だが、ストーリーとしては、おおよそ同一データであるとして「名寄せによるデータ統合・ユニーク化」できるはずだが、そうでないものが混ざっていて、そのようなものを見つけたいという方向性。
  •  ※ Pythonにおいてのdictの比較は、元にしたJSONなどのプロパティの並び順には関係なく(もともと意味がないので当たり前だが)同じものは同じとして比較できるが、Pandasは通常の型であれば重複確認の標準関数があるが、dictの場合は不可なので、別のアイディアを活用。
  • 重複チェック(上記のとおりどちらかといえば、ユニークにできない不明データの存在チェック)時の切り分けフラグ(ここでは、has_cousinというプロパティでラベルづけすることにした)を付与するような論理。
  • 名寄せによるデータ統合を行うが、名寄せ条件に含めないあるカラムの値については、配下のレコードのうち、これこれの値があれば、それを統合データの値とする...のような「生き」の条件をforループを使わずに選定する論理の例。
  • その他、上記のようなユースケースにありがちなちょっとしたクレンジングの小品をいくつか。

関連エンティティ相当のテーブルの情報を重心側のデータを軸にしたJSONデータに変換

ストーリー

多少デフォルメして説明すると、

cust_id product_cd
111 AAA
111 CCC
111 DDD

↑このようなデータを次↓に変換します。

[
  { "cust_id" : 111, "product_cd" : ["AAA", "CCC", "DDD"] },
...
]
 

関連エンティティ相当のテーブルの情報を、1対Nの1側の情報を軸にして、1側をキーとしたJSON情報に非正規化する。

イディオム

  • pd.mergeで、コード値と表示名マスタを結合して、表示名を取得。
  • 列Xでグループ化し、グループしたレコードについて、列Yの値を配列に詰め込みする。→ そのグループ内の列Yの値のバリエーションを得る。
  • 列Xのグループ化の対象とする行は、列Zの値がxxxのもののみ(以下では、SPECIALSという定数で定義)とする、といった条件付きのグループ化とする。

関連エンティティ風のデータの前日と当日の比較

ストーリー

昨日と今日の契約期中の取引情報全件(ただし、関連エンティティ型となっており、顧客単位に名寄せする必要がある)があって、変更があった顧客のリストを抜き出す。

この例で示しているイディオム

  • 昨日と今日それぞれの顧客単位の取引の配列の一覧を作成。
  • 上記を顧客IDで完全外部結合して、左に存在するが右に存在しないものは契約終了の顧客...のように変更検知する。

JSON中のNULL項目についてはプロパティを出力しない(そのような項目を取り除く)

ストーリー

DataFrameは便利だが、項目数が多い場合、いわゆるスパースマトリックスの無駄が発生する。また、項目の中にネストされているようなデータがある場合も何かと無駄が悩ましい。 このような状況において、スパースマトリックスの問題自体は他の場に譲るとして、最終結果をJSONファイルに出力する際に、Nullの項目についてはプロパティ名を出力しないようにすることで、データをコンパクトにする...という例。

※という意味では、andasのイディオムというよりは、dict(JSONと相互変換可能なものに限定されるが、JSONの範囲であれば、ネストが複雑でも再起によってなんとかなる)の再起処理の例に該当。

以上です。

検索サイトであるあるかもしれないインデックスimport前のドキュメント標準化をざっくり試すためのPython/Pandasのイディオムメモ

はじめに

検索サイトで、Elasticsearchなどの検索エンジンにデータを雑に食わせるにあたり、この用途にPandasが手頃(Pandasはもっとリッチなことができるはずだといった話はさておき)なような気がしてきので、サンプルコードを作成してみました。

f:id:azotar:20190324170507p:plain

この記事は、次の2つの記事のちょっとした続編です。

itdepends.hateblo.jp

itdepends.hateblo.jp

特に、後者の方の記事の真ん中あたりにある、

補足3: 自前アプリでの検索用ドキュメント標準化のフレームワーク

の節あたりがやや言いっ放しになっていたので*1、主張を補強するための例として、次のコードをまとめてみたのでした。

結果、Pandasのシンタックスを覚えたその次のコーディングのイディオムっぽくなったのと、Elasticsearchのなんらかの発展に貢献できればと思ってサンプルコードをのせてみます。

Pandas/Pythonのサンプルコード

サンプルコードでのアプローチイメージ(再掲) ↓

f:id:azotar:20190324170507p:plain

インプットファイルのイメージ

  {"id": 1, "a": {"b": {"c1": [111, 222, 333], "c2":111}}, "x":"あああ", "y":"いいい", "z1":"ううう", "to_be_deleted":"消される運命", "lat":"35.01.23.456", "lon":"139.01.23.456"},
     {"id": 2, "a": {"b": {"c1": [333, 444, 555]}}, "x": "えええ",
         "y": "おおお", "z2": "かかか", "lat": "35.01.23.456", "lon": "139.01.23.456"},
     {"id": 3, "x": "えええ", "lat": "35.01.23.456", "lon": "139.01.23.456",
               "rel1": True, "rel2": "この値はクリアされる", "rel3": "この値はクリアされる"},
     {"id": 4, "x": "えええ", "lat": "35.01.23.456", "lon": "139.01.23.456",
               "rel1": True, "rel2": "この値はクリアされる", "rel3": "この値はクリアされる"},
     {"id": 5, "lat": "35.01.23.456", "lon": "139.01.23.456",
         "dumpobj": [{"one": 5, "two": 6, "three": [
             7, 8, 9], "four": None, "five": [], "six": [{"a": "100"}]}]
      }

サンプルコード

import pandas as pd
import numpy as np
import json
from pandas.io.json import json_normalize
import copy

# ---------------------  関数定義 ----------------------------

# 表示名に変換


def map2dispname(obj, mapping):
    """
    コード値を格納したobjに対して、mapping(dict型) で定義された表示名に置き換えた値を返す。
    """
    if type(obj) is float:
        # floatの場合は、処理対象外のため、便宜上、空文字列を返す。
        return ""  # TBD
    elif type(obj) is str or type(obj) is int:
        # strもしくはintの場合
        return mapping.get(str(obj), str(obj))
    elif hasattr(obj, "__iter__"):
        # 列挙型の場合(返り値はlist型に強制)
        return list(map(lambda y: mapping.get(str(y), str(y)), obj))
    else:
        return str(obj)


def series_to_arrfield(v):
    """ 
    Seriesを配列風の1つの文字列に変換
    """
    return ' '.join(map(str, v.fillna('').values))


def funny_dms_to_deg(dms_str):
    """
    その場しのぎのadhocな緯度経度形式の変換の関数(次に定義のdms_to_degを優先したので実際は未使用)
    """
    if pd.isnull(dms_str):
        return None
    else:
        dms_str_arr = [s for s in str(dms_str).replace(
            'E', '').replace('N', '').split('.')]
        deg = dms_str_arr[0] + '.' + dms_str_arr[1] + \
            dms_str_arr[2] + dms_str_arr[3]
        if deg == None:
            return None
        return float(deg)


def dms_to_deg(dms_str):
    """
    緯度経度をDMS形式からDEG形式に変換する関数
    (この例で臨場感を出すために設けたが、このロジック自体はご紹介したいものの主題ではない。)
    """
    if pd.isnull(dms_str):
        return None
    else:
        dms_str_arr = [s for s in str(dms_str).replace(
            'E', '').replace('N', '').split('.')]
        deg = int(dms_str_arr[0])
        + int(dms_str_arr[1])/60
        + (int(dms_str_arr[2]) + int(dms_str_arr[3])/1000)/3600
        if deg == None:
            return None
        return deg


dtd = dms_to_deg


def all_values(obj):
    """
    dictinary型(≒ JSON)のある階層より下の最下層のコンテンツ(value)を文字列でダンプする。
    データソースは複雑な階層のJSONだが、Elasticsearchなど検索エンジンのインデックスに取り込む際は、生データのテキストの列挙で良い...という要件の場合に、ドキュメントのプレ加工を行うことをイメージしている。
    """
    # コンセプトメモ:
    # ターゲットが、JSON由来のデータなので、定石として、シンプルな再起処理でデータを掘り下げられるハズだが、いざ必要になった際にフリーハンドで実現するのも手間なので、イディオムとして残してみた。
    # なお、dictinary型(≒ JSON)のNoneのフィールドを除去する方針とした。
    #  ※もっと簡単に実装できる気がする。そもそもライブラリやイディオムがあるような気もする。
    #  例えば、DataFrame.values()をうまくつかえばもっとシンプルな記述にできるかもしれない。

    def wrap_all_values(val):
        """
        内部関数
        # 最下層のコンテンツかどうかを判定し、文字列を返す。最下層でなければ、all_valuesを呼び出す
        """
        if val is None:
            return ""
        elif isinstance(val, str) or isinstance(val, int):
            return str(val)
        else:
            return str(all_values(val))

    DELEMITER = ','
    if obj is None:
        return ""
    vt = ""
    if isinstance(obj, dict):
        for val in obj.values():
            vt = vt + DELEMITER + wrap_all_values(val)
    elif isinstance(obj, list):
        for val in obj:
            vt = vt + DELEMITER + wrap_all_values(val)
    return vt


# JSONの「null」や空文字、空の配列のフィールドのプロパティを削除する
# -------------インプットデータやコンフィグ情報など ---------------------
# 仮想コンフィグファイル
J2J_CONFIG = [
    {"FN": "a.b.c1", "M2D": "111:NewYork, 222:California,333:Texas,444:Hawaii"},
    {"FN": "a.b.c2", "M2D": "111:NewYork, 222:California,333:Texas,444:Hawaii"},
    {"FN": "x", "CNC": True, "PSH": "newarray"},
    {"FN": "y", "CNC": True, "PSH": "newarray"},
    {"FN": "z1", "CNC": True, "PSH": "newarray"},
    {"FN": "z2", "CNC": True,  "PSH": "newarray"},
    {"FN": "z3", "CNC": True},
    {"FN": "rel1"},
    {"FN": "rel2", "PRN": "rel1"},
    {"FN": "rel3", "PRN": "rel1"},
    {"FN": "to_be_deleted", "DL": True},
    {"FN": "create_user", "DL": True},
    {"FN": "update_user", "DL": True},
    {"FN": "dumpobj", "VLS": True}
]
"""
コンフィグの考え方:

後述のインプットデータの項目名(=DataFrameの列名) が、「fn」で定義されているフィールドについて、「M2D」にdict定義風の文字列で、コード値から表示名への定義がされていれば、その項目を表示名に変換した新たな項目を派生させる。
(元ネタは、EXCELで定義された変換仕様表的なやつをイメージ。(どこかの界隈で好まれそうなアプローチですね...))
「M2D」をはじめ「cnc」「PSH」「PRN」「DL」「VLS」も変換内容やこの設定の考え方は違えど同じような発想のもの。
 ※「PRN」は、他のものとやや異なり、その項目の親フィールド名を定義している。親フィールド名を定義して何をするかは、後述の実際の処理イメージを見てください。
]


"""


J2J_CONFIG_DF = pd.DataFrame(J2J_CONFIG)

# 仮想インプットJSON(複数レコード)
df = json_normalize(
    [{"id": 1, "a": {"b": {"c1": [111, 222, 333], "c2":111}}, "x":"あああ", "y":"いいい", "z1":"ううう", "to_be_deleted":"消される運命", "lat":"35.01.23.456", "lon":"139.01.23.456"},
     {"id": 2, "a": {"b": {"c1": [333, 444, 555]}}, "x": "えええ",
         "y": "おおお", "z2": "かかか", "lat": "35.01.23.456", "lon": "139.01.23.456"},
     {"id": 3, "x": "えええ", "lat": "35.01.23.456", "lon": "139.01.23.456",
               "rel1": True, "rel2": "この値はクリアされる", "rel3": "この値はクリアされる"},
     {"id": 4, "x": "えええ", "lat": "35.01.23.456", "lon": "139.01.23.456",
               "rel1": True, "rel2": "この値はクリアされる", "rel3": "この値はクリアされる"},
     {"id": 5, "lat": "35.01.23.456", "lon": "139.01.23.456",
         "dumpobj": [{"one": 5, "two": 6, "three": [
             7, 8, 9], "four": None, "five": [], "six": [{"a": "100"}]}]
      }
     ]
)


# -------------ここから変換処理 ---------------------


"""
PRN処理 ========================
# 親削除フラグ項目がTrueならクリアする。
  良くある論理削除フラグと似たようなもので、グループ項目全体を無効にする意味のフラグがONの場合、配下の項目はユーザー画面には表示しない...という類のものは、当然Elasticsearchの検索対象、表示項目にしたくないためそれをクリアしておくというような用途をイメージ。
※いきなり最もめんどくさいパターンからになるので、他のパターンを先に見てください。
"""

# 関連するコンフィグを抜き出す。 「(親フィールド, 子フィールドの配列名一覧)」を抜き出す
df_fields_and_parent = J2J_CONFIG_DF.loc[J2J_CONFIG_DF['PRN'].str.len() > 0].groupby(['PRN'])[
    'FN'].apply(list).reset_index()

# 条件付きクリア
for idx, row in df_fields_and_parent.iterrows():
    flg_field = row['PRN']
    clear_targets = row['FN']
    # 親フィールドがTrueの行について、子フィールドの値を空文字列(この例でのクリアの扱い)に設定して代入しなおす。
    df[clear_targets] = df[df[flg_field] == True][clear_targets].apply(
        lambda v: pd.Series(['' for s in clear_targets]), axis=1)


"""
  プレクリーニング ========================
  要件上、不要な項目をこの時点で削除。欠損値の補完/項目削除。特定の値の場合は、欠損値扱いにして削除。
  #  (未実装)
"""

pass

"""
 M2D処理 ====================
 表示名に変換
"""
df_maps = J2J_CONFIG_DF.loc[J2J_CONFIG_DF['M2D'].str.len() > 0]

for idx, row in df_maps.iterrows():
    m = {i.split(':')[0]: i.split(':')[1] for i in row['M2D'].split(',')}
    fname = row['FN']
    # 表示名を設定したフィールドを、「元のフィールド名+_M2D」に派生(代入)
    df[fname + '_M2D'] = df[fname].apply(
        lambda x: map2dispname(x, m))


"""
VLS処理 ========================
JSONの配下のvalueを全てダンプする

"""
# 当該フィールド(dict型)のものについて、配下の全ての値のみ取得し、出力する
df_dump_fields = J2J_CONFIG_DF.loc[J2J_CONFIG_DF['VLS'] == True]['FN']

for idx, val in df_dump_fields.iteritems():
    fname = val
    df[fname + '_VLS'] = df[fname].apply(lambda x: all_values(x))

"""
 SH処理 ========================
当該項目の内容を、コンフィグで指定された集約対象のフィールドにアペンドする。
情報源の上流システムがパッケージソフトなどによるRDBテーブルのカラム自動生成方式であり、入力画面では複数選択可能チェックボックスだが、選択肢ごとにRDBのテーブルの1カラムとなっているようなものを、検索時にキーワードでシンプルにヒットできるようにすることをイメージした変換。
※言葉にするとややこしいので、シンプルバージョンのCNC処理を先に見た方が良い。
"""

df_fields_into_one_array = J2J_CONFIG_DF.loc[J2J_CONFIG_DF['PSH'].str.len() > 0].groupby(['PSH'])[
    'FN'].apply(list).reset_index()

for idx, row in df_fields_into_one_array.iterrows():
    array_name = row['PSH']
    fields = row['FN']
    # ビジネスルール的視点ではアペンドだが、ここでは、対象フィールドの値の配列を作ってそれを代入
    df[array_name] = df[fields].fillna('').values.tolist()


"""
 CNC処理 ========================
これの処理対象の指定があるフィールドについて、指定項目の内容を、指定の(ここではCONCAT_FIELDという名前のDataFrameの列)にアペンドする。
実際は、前項のSH処理のシンプルバージョンである。
よって、SH処理でも代用できるが、CONCAT_FIELDというマジックフィールドをビジネスルール的に意識したいことから専用の論理にした。
"""

# 所定の項目を全て取り込んだ文字列フィールド CONCAT_FIELD
batch_concat_cols = J2J_CONFIG_DF.query('CNC == True')['FN'].tolist()
batch_concat_cols = list(set(batch_concat_cols) & set(df.columns))
df['CONCAT_FIELD'] = df[batch_concat_cols].apply(series_to_arrfield, axis=1)

"""
# 緯度経度の形式変換 ======================
#  今回のストーリーとしてはそれほど意味をなさない例だが、df['xxx']への代入ではなく、DataFrame.assign()を使ってみた例。
#  コンフィグによるものではなく、項目狙い撃ちのスペシャルな変換もあるよね...という例。
"""
df = df.assign(xlon=df['lon'].apply(dtd))
df = df.assign(xlat=df['lat'].apply(dtd))
df = df.assign(location=df[['xlon', 'xlat']].values.tolist())

"""
 DL処理 =========================
# 不要なフィールドは削除する
"""
batch_del_cols = J2J_CONFIG_DF.query('DL == True')['FN'].tolist()
batch_del_cols = list(set(batch_del_cols) & set(df.columns))
df = df.drop(labels=batch_del_cols, axis=1)


# -------------ここから出力処理 ---------------------

# JSON Lineで出力

df_lines = df.to_json(
    force_ascii=False, orient='records', lines=True)

# 課題メモ:
# この場合、JSONの最大プロパティ数が多いとnullや空の配列のデータが多く出力されるスパースなデータとなってしまう。


# Elasticsearchのバルクロード用の出力のJSON Lines
# データをJSON Lines出力するだけなら、DataFrame.to_jsonで事足りるのだが、Elasticsearchのバルクロードは、アクション(正式名称は忘れた。POST先のインデックス名やCRUDのどれかなどを示すもの。)と対象レコードを1対とした繰り返し形式のため、ひとまずこんな感じで生で出力。
for i in iter(df_lines.split("\n")):
    v_json = json.loads(i)
    print('{"index": { "_id" : ' + str(v_json['id']) + '}}')
    print(i)

サンプルコードの実行結果

nullヌルだらけになってしまいましたが、これはまた。

{"index": { "_id" : 1}}
{"a.b.c1":[111,222,333],"a.b.c2":111.0,"dumpobj":null,"id":1,"lat":"35.01.23.456","lon":"139.01.23.456","rel1":null,"rel2":null,"rel3":null,"x":"あああ","y":"いいい","z1":"ううう","z2":null,"a.b.c1_M2D":["NewYork","222","Texas"],"a.b.c2_M2D":"","dumpobj_VLS":"","newarray":["あああ","いいい","ううう",""],"CONCAT_FIELD":"いいい  あああ ううう","xlon":139,"xlat":35,"location":[139,35]}
{"index": { "_id" : 2}}
{"a.b.c1":[333,444,555],"a.b.c2":null,"dumpobj":null,"id":2,"lat":"35.01.23.456","lon":"139.01.23.456","rel1":null,"rel2":null,"rel3":null,"x":"えええ","y":"おおお","z1":null,"z2":"かかか","a.b.c1_M2D":["Texas","Hawaii","555"],"a.b.c2_M2D":"","dumpobj_VLS":"","newarray":["えええ","おおお","","かかか"],"CONCAT_FIELD":"おおお かかか えええ ","xlon":139,"xlat":35,"location":[139,35]}
{"index": { "_id" : 3}}
{"a.b.c1":null,"a.b.c2":null,"dumpobj":null,"id":3,"lat":"35.01.23.456","lon":"139.01.23.456","rel1":true,"rel2":"","rel3":"","x":"えええ","y":null,"z1":null,"z2":null,"a.b.c1_M2D":"","a.b.c2_M2D":"","dumpobj_VLS":"","newarray":["えええ","","",""],"CONCAT_FIELD":"  えええ ","xlon":139,"xlat":35,"location":[139,35]}
{"index": { "_id" : 4}}
{"a.b.c1":null,"a.b.c2":null,"dumpobj":null,"id":4,"lat":"35.01.23.456","lon":"139.01.23.456","rel1":true,"rel2":"","rel3":"","x":"えええ","y":null,"z1":null,"z2":null,"a.b.c1_M2D":"","a.b.c2_M2D":"","dumpobj_VLS":"","newarray":["えええ","","",""],"CONCAT_FIELD":"  えええ ","xlon":139,"xlat":35,"location":[139,35]}
{"index": { "_id" : 5}}
{"a.b.c1":null,"a.b.c2":null,"dumpobj":[{"one":5,"two":6,"three":[7,8,9],"four":null,"five":[],"six":[{"a":"100"}]}],"id":5,"lat":"35.01.23.456","lon":"139.01.23.456","rel1":null,"rel2":null,"rel3":null,"x":null,"y":null,"z1":null,"z2":null,"a.b.c1_M2D":"","a.b.c2_M2D":"","dumpobj_VLS":",,5,6,,7,8,9,,,,,100","newarray":["","","",""],"CONCAT_FIELD":"   ","xlon":139,"xlat":35,"location":[139,35]}

念のための免責事項

この記事は、実質pandasとElasticsearchの勉強メモです。 だれかの参考になればということで、○○風のデータがあれば、それをpandasでこんな加工をして、こんな感じで取り込めば、こういう活用ができるよねという技術メモをフリーハンドで記載したものです。

また、pandasやElasticsearchの各種コード例ですが、ひとまず私自身も含めて初学者の方がきっかけをつかむための、見てのとおりの完成度です。 言うまでもなく、そのままプロダクトに取り込めるレベルではないですし、この先は、できるだけ公式の情報源をあたっていただくとか、例えば、コーディングスタイルもpythonicなコードになるように磨き上げて抱く前提です。 (つまるところ、この記事の例は、そのレベルのものではないですよという話、言い訳です。念のため。)

*1:もともと位置づけからすると日曜大工ブログなので全体的に言いっ放しなのは宿命ですが...