はてだBlog(仮称)

私的なブログど真ん中のつもりでしたが、気づけばWebサイト系のアプリケーション開発周りで感じたこと寄りの自分メモなどをつれづれ述べています。2020年6月現在、Elasticsearch、pandas、CMSなどに関する話題が多めです。...ですが、だんだんとより私的なプログラムのスニペット置き場になりつつあります。ブログで述べている内容は所属組織で販売している製品などに関するものではなく、また所属する組織の見解を代表するものではありません。

駅データ.jpをサンプルに使ったElasticsearchのGeo検索のクエリ例、事前準備のPandas、Pyprojでのデータ加工、Pythonクライアントでのバルクロード、Pythonクライアントでの検索、

はじめに

駅データ.jpという駅の路線データおよび緯度経度の座標を管理してありフリーでも利用可能な*1データが提供されています。

www.ekidata.jp

今回駅データ.jpのデータについて、次のチュートリアル(ひとまず動くサンプルコードを動かしてみるの意)として良さそうな例だったので、無料データをインプットにサンプルコードをまとめてました。

  • ElasticsearchのGeo検索(のクエリの形式)
  • Elasticsearchの公式Pythonクライアントライブラリを使った、バルクロード、検索の例
  • Pandasのデータ変換
  • Pythonの緯度経度を扱う有名ライブラリであるPyprojで2点間の距離を求めてみる

サンプルコード

1. 駅データ.jpデータをJSONに変換しやすいCSVに編集

元ネタは正規化されたCSVデータですので、これを結合します。 (実際のところ、今回の範囲のElasticsearchを使ってみるの範囲では結合は不要です。) 距離を求めているところは、今回のElasticsearchでの検索とは直接関係ありません。 Pyprojを使った例をまぜこみたかったからです。

import pandas as pd
import numpy as np
import sys
import csv
import copy
import pyproj

sta_df = pd.read_csv('eki/station.csv')
pref_df = pd.read_csv('eki/pref.csv')
line_df = pd.read_csv('eki/line.csv')
join_df = pd.read_csv('eki/join.csv')

# join.csvは、終点用のエントリ(次の駅のコード)が存在しない仕様になっている。
# 今回、終点の駅も「次の駅までの距離」を求めたいため、終点の駅用にに対し終点の駅のひとつ前の駅を次の駅とみなしたエントリを追加する。
# 終点駅の一覧
terminal_stas = pd.DataFrame(list(set(join_df['station_cd2'].tolist()) -
                                  set(join_df['station_cd1'].tolist())), columns=['station_cd2'])

# 終点駅 -> 前の駅のデータを、join_dfの形式に合わせて生成
terminal_stas_join_df = pd.merge(terminal_stas, join_df[['line_cd', 'station_cd1', 'station_cd2']], on=[
    'station_cd2'], suffixes=('', '_r'), how='left')[['line_cd', 'station_cd2', 'station_cd1']].rename(columns={'station_cd2': 'station_cd1', 'station_cd1': 'station_cd2'})

# join_dfに追加
join_df = join_df.append([terminal_stas_join_df]).reset_index()


# DataFrameを作る
# 反対のエントリを作成
# 最初のjoin_dfにconcatする

sta_df = pd.merge(sta_df, pref_df, on=[
    'pref_cd'], suffixes=('', '_r'), how='left')
sta_df = pd.merge(sta_df, line_df, on=[
    'line_cd'], suffixes=('', '_r'), how='left')
sta_df = pd.merge(sta_df, join_df,
                  left_on=['station_cd'], right_on=['station_cd1'], suffixes=('', '_r'), how='left')

# join_dfから取得した、次の駅の緯度経度を取得
sta_df = pd.merge(sta_df, sta_df[['station_cd', 'lon', 'lat']],
                  left_on=['station_cd2'], right_on=['station_cd'], suffixes=('', '_next'), how='left')


# 使いそうな項目
columns = ['station_cd', 'station_name', 'line_cd', 'pref_cd', 'post', 'add', 'lon',
           'lat', 'e_status', 'e_sort', 'pref_name',
           'company_cd', 'line_name', 'line_name_k', 'line_name_h',
           'line_type', 'lon_r', 'lat_r', 'zoom', 'e_status_r',
           'e_sort', 'line_cd', 'station_cd1', 'station_cd2', 'lon_next', 'lat_next']


# 運営中の駅
df = sta_df[sta_df['e_status'] == 0]


def get_distance(series):
    if series.isnull()['lon_next']:
        return 0
    grs80 = pyproj.Geod(ellps='GRS80')  # GRS80楕円体
    x, y, distance = grs80.inv(
        series['lon'], series['lat'], series['lon_next'], series['lat_next'])
    return int(distance)


# 隣駅との距離を設定
df = df.assign(dist=lambda df: df.apply(get_distance, axis=1))

# ekijpall.csvを出力
df[['station_cd', 'station_name', 'lon', 'lat', 'pref_name',
    'line_name', 'dist']].to_csv('ekijpall.csv', sep='\t', header=True)

2. JSONを作成〜Pythonクライアントでバルクロード

バルクロードします。前のステップの出力ファイルであるekijpall.csvを読み込んでいます。 なお、コード中の次のくだりで、緯度経度をElasticsearchのgeo_point型のデータ登録に合うように配列形式に編集しています。

df = df.assign(location=df'lon', 'lat'.values.tolist())

from elasticsearch import Elasticsearch, helpers
import pandas as pd
import numpy as np
import json
from pandas.io.json import json_normalize
import copy

es = Elasticsearch(host='localhost', port=9200)
INDEX = "x-idx"

"""
kibanaのDevToolsなどで次のmappings設定をしておきましょう。
PUT x-idx
{
    "mappings": {
        "_doc": {
            "properties": {
                "location": {"type": "geo_point"}
            }
        }
    }
}
"""

# 事前に作り込んだ駅データ.jpのデータを読み込み
df = pd.read_csv('ekijpall.csv', sep='\t')  # , skiprows=lambda x: x > 1000)

df = df.assign(location=df[['lon', 'lat']].values.tolist())

# JSON Linesを作る
df_lines = df.to_json(
    force_ascii=False, orient='records', lines=True)

# バルクロードの準備
actions = []
for i in iter(df_lines.split("\n")):
    v_json = json.loads(i)
    actions.append({
        "_index": INDEX,
        "_type": "_doc",
        "_id": v_json["station_cd"],
        "_source": v_json
    })

# バルクロードする
#  https://elasticsearch-py.readthedocs.io/en/master/helpers.html?highlight=bulk
helpers.bulk(es, actions)

3. Elasticsearch Geo検索の例

記事を書くにあたりちょいと調べたことを盛り込みたかったのですが*2、余裕がないので、サンプルのサンプルに絞ったものに限定。

Pythonクライアントだと、コンストラクタを立ち上げて、そいつにsearchメソッドでクエリをかませると検索できるようです。

ここでは、geo_boundingとgeo_distanceを発行してみました。

from elasticsearch import Elasticsearch, helpers
import pandas as pd
import numpy as np
import json
from pandas.io.json import json_normalize
import copy


es = Elasticsearch(host='localhost', port=9200)
INDEX = "x-idx"

x = es.search(index=INDEX, body={"query":
                                 {"geo_bounding_box":
                                  {"location":
                                   {"top_left":
                                    {"lat": 36, "lon": 140},
                                       "bottom_right":
                                       {"lat": 32, "lon": 139.3}
                                    }
                                   }
                                  }
                                 })

print(x)

x = es.search(index=INDEX, body={"query":
                                 {"geo_distance":
                                  {
                                      "distance": "500m",
                                      "location": {
                                          "lat": 35.6983573,
                                          "lon": 139.7709256
                                      }
                                  }
                                  }
                                 })

print(x)


参考にさせていただいたサイト等のリンク

pyproj [いかたこのたこつぼ]

Helpers — Elasticsearch 6.3.1 documentation

Geo queries | Elasticsearch Reference [6.6] | Elastic

Elasticsearch 5系で距離を算出するscriptをpainlessで書く - Qiita

検索順位を自在に操る | Elastic

免責事項

一応他所様のデータを使った内容なので免責事項的なところをひとこと。

この記事は、実質PandasとElasticsearchの勉強メモです。 だれかの参考になればということで、○○風のデータがあれば、それをPandasでこんな加工をして、こんな感じで取り込めば、こういう活用ができるよねという技術メモをフリーハンドで記載したものです。 当たり前の話ですが念のため記載しておくと、本当にどこかで使う場合は、データ提供元が定める利用規約を確認してください。

また、PandasやElasticsearchの各種コード例ですが、ひとまず私自身も含めて初学者の方がきっかけをつかむための、見てのとおりの完成度です。 言うまでもなく、そのままプロダクトに取り込めるレベルではないですし、プロダクトと言わずもなんらかコピペ実行するなどの際は、ご注意ください。

*1:商用データもあります。商用・フリー問わず、実際の利用は提供元が示している権利関係や利用規約をご覧ください

*2:なんか勿体つけている感じですが、自分への将来の宿題としての意味...