はじめに
検索サイトで、Elasticsearchなどの検索エンジンにデータを雑に食わせるにあたり、この用途にPandasが手頃(Pandasはもっとリッチなことができるはずだといった話はさておき)なような気がしてきので、サンプルコードを作成してみました。
この記事は、次の2つの記事のちょっとした続編です。
特に、後者の方の記事の真ん中あたりにある、
補足3: 自前アプリでの検索用ドキュメント標準化のフレームワーク化
の節あたりがやや言いっ放しになっていたので*1、主張を補強するための例として、次のコードをまとめてみたのでした。
結果、Pandasのシンタックスを覚えたその次のコーディングのイディオムっぽくなったのと、Elasticsearchのなんらかの発展に貢献できればと思ってサンプルコードをのせてみます。
Pandas/Pythonのサンプルコード
サンプルコードでのアプローチイメージ(再掲) ↓
インプットファイルのイメージ
{"id": 1, "a": {"b": {"c1": [111, 222, 333], "c2":111}}, "x":"あああ", "y":"いいい", "z1":"ううう", "to_be_deleted":"消される運命", "lat":"35.01.23.456", "lon":"139.01.23.456"}, {"id": 2, "a": {"b": {"c1": [333, 444, 555]}}, "x": "えええ", "y": "おおお", "z2": "かかか", "lat": "35.01.23.456", "lon": "139.01.23.456"}, {"id": 3, "x": "えええ", "lat": "35.01.23.456", "lon": "139.01.23.456", "rel1": True, "rel2": "この値はクリアされる", "rel3": "この値はクリアされる"}, {"id": 4, "x": "えええ", "lat": "35.01.23.456", "lon": "139.01.23.456", "rel1": True, "rel2": "この値はクリアされる", "rel3": "この値はクリアされる"}, {"id": 5, "lat": "35.01.23.456", "lon": "139.01.23.456", "dumpobj": [{"one": 5, "two": 6, "three": [ 7, 8, 9], "four": None, "five": [], "six": [{"a": "100"}]}] }
サンプルコード
import pandas as pd import numpy as np import json from pandas.io.json import json_normalize import copy # --------------------- 関数定義 ---------------------------- # 表示名に変換 def map2dispname(obj, mapping): """ コード値を格納したobjに対して、mapping(dict型) で定義された表示名に置き換えた値を返す。 """ if type(obj) is float: # floatの場合は、処理対象外のため、便宜上、空文字列を返す。 return "" # TBD elif type(obj) is str or type(obj) is int: # strもしくはintの場合 return mapping.get(str(obj), str(obj)) elif hasattr(obj, "__iter__"): # 列挙型の場合(返り値はlist型に強制) return list(map(lambda y: mapping.get(str(y), str(y)), obj)) else: return str(obj) def series_to_arrfield(v): """ Seriesを配列風の1つの文字列に変換 """ return ' '.join(map(str, v.fillna('').values)) def funny_dms_to_deg(dms_str): """ その場しのぎのadhocな緯度経度形式の変換の関数(次に定義のdms_to_degを優先したので実際は未使用) """ if pd.isnull(dms_str): return None else: dms_str_arr = [s for s in str(dms_str).replace( 'E', '').replace('N', '').split('.')] deg = dms_str_arr[0] + '.' + dms_str_arr[1] + \ dms_str_arr[2] + dms_str_arr[3] if deg == None: return None return float(deg) def dms_to_deg(dms_str): """ 緯度経度をDMS形式からDEG形式に変換する関数 (この例で臨場感を出すために設けたが、このロジック自体はご紹介したいものの主題ではない。) """ if pd.isnull(dms_str): return None else: dms_str_arr = [s for s in str(dms_str).replace( 'E', '').replace('N', '').split('.')] deg = int(dms_str_arr[0]) + int(dms_str_arr[1])/60 + (int(dms_str_arr[2]) + int(dms_str_arr[3])/1000)/3600 if deg == None: return None return deg dtd = dms_to_deg def all_values(obj): """ dictinary型(≒ JSON)のある階層より下の最下層のコンテンツ(value)を文字列でダンプする。 データソースは複雑な階層のJSONだが、Elasticsearchなど検索エンジンのインデックスに取り込む際は、生データのテキストの列挙で良い...という要件の場合に、ドキュメントのプレ加工を行うことをイメージしている。 """ # コンセプトメモ: # ターゲットが、JSON由来のデータなので、定石として、シンプルな再起処理でデータを掘り下げられるハズだが、いざ必要になった際にフリーハンドで実現するのも手間なので、イディオムとして残してみた。 # なお、dictinary型(≒ JSON)のNoneのフィールドを除去する方針とした。 # ※もっと簡単に実装できる気がする。そもそもライブラリやイディオムがあるような気もする。 # 例えば、DataFrame.values()をうまくつかえばもっとシンプルな記述にできるかもしれない。 def wrap_all_values(val): """ 内部関数 # 最下層のコンテンツかどうかを判定し、文字列を返す。最下層でなければ、all_valuesを呼び出す """ if val is None: return "" elif isinstance(val, str) or isinstance(val, int): return str(val) else: return str(all_values(val)) DELEMITER = ',' if obj is None: return "" vt = "" if isinstance(obj, dict): for val in obj.values(): vt = vt + DELEMITER + wrap_all_values(val) elif isinstance(obj, list): for val in obj: vt = vt + DELEMITER + wrap_all_values(val) return vt # JSONの「null」や空文字、空の配列のフィールドのプロパティを削除する # -------------インプットデータやコンフィグ情報など --------------------- # 仮想コンフィグファイル J2J_CONFIG = [ {"FN": "a.b.c1", "M2D": "111:NewYork, 222:California,333:Texas,444:Hawaii"}, {"FN": "a.b.c2", "M2D": "111:NewYork, 222:California,333:Texas,444:Hawaii"}, {"FN": "x", "CNC": True, "PSH": "newarray"}, {"FN": "y", "CNC": True, "PSH": "newarray"}, {"FN": "z1", "CNC": True, "PSH": "newarray"}, {"FN": "z2", "CNC": True, "PSH": "newarray"}, {"FN": "z3", "CNC": True}, {"FN": "rel1"}, {"FN": "rel2", "PRN": "rel1"}, {"FN": "rel3", "PRN": "rel1"}, {"FN": "to_be_deleted", "DL": True}, {"FN": "create_user", "DL": True}, {"FN": "update_user", "DL": True}, {"FN": "dumpobj", "VLS": True} ] """ コンフィグの考え方: 後述のインプットデータの項目名(=DataFrameの列名) が、「fn」で定義されているフィールドについて、「M2D」にdict定義風の文字列で、コード値から表示名への定義がされていれば、その項目を表示名に変換した新たな項目を派生させる。 (元ネタは、EXCELで定義された変換仕様表的なやつをイメージ。(どこかの界隈で好まれそうなアプローチですね...)) 「M2D」をはじめ「cnc」「PSH」「PRN」「DL」「VLS」も変換内容やこの設定の考え方は違えど同じような発想のもの。 ※「PRN」は、他のものとやや異なり、その項目の親フィールド名を定義している。親フィールド名を定義して何をするかは、後述の実際の処理イメージを見てください。 ] """ J2J_CONFIG_DF = pd.DataFrame(J2J_CONFIG) # 仮想インプットJSON(複数レコード) df = json_normalize( [{"id": 1, "a": {"b": {"c1": [111, 222, 333], "c2":111}}, "x":"あああ", "y":"いいい", "z1":"ううう", "to_be_deleted":"消される運命", "lat":"35.01.23.456", "lon":"139.01.23.456"}, {"id": 2, "a": {"b": {"c1": [333, 444, 555]}}, "x": "えええ", "y": "おおお", "z2": "かかか", "lat": "35.01.23.456", "lon": "139.01.23.456"}, {"id": 3, "x": "えええ", "lat": "35.01.23.456", "lon": "139.01.23.456", "rel1": True, "rel2": "この値はクリアされる", "rel3": "この値はクリアされる"}, {"id": 4, "x": "えええ", "lat": "35.01.23.456", "lon": "139.01.23.456", "rel1": True, "rel2": "この値はクリアされる", "rel3": "この値はクリアされる"}, {"id": 5, "lat": "35.01.23.456", "lon": "139.01.23.456", "dumpobj": [{"one": 5, "two": 6, "three": [ 7, 8, 9], "four": None, "five": [], "six": [{"a": "100"}]}] } ] ) # -------------ここから変換処理 --------------------- """ PRN処理 ======================== # 親削除フラグ項目がTrueならクリアする。 良くある論理削除フラグと似たようなもので、グループ項目全体を無効にする意味のフラグがONの場合、配下の項目はユーザー画面には表示しない...という類のものは、当然Elasticsearchの検索対象、表示項目にしたくないためそれをクリアしておくというような用途をイメージ。 ※いきなり最もめんどくさいパターンからになるので、他のパターンを先に見てください。 """ # 関連するコンフィグを抜き出す。 「(親フィールド, 子フィールドの配列名一覧)」を抜き出す df_fields_and_parent = J2J_CONFIG_DF.loc[J2J_CONFIG_DF['PRN'].str.len() > 0].groupby(['PRN'])[ 'FN'].apply(list).reset_index() # 条件付きクリア for idx, row in df_fields_and_parent.iterrows(): flg_field = row['PRN'] clear_targets = row['FN'] # 親フィールドがTrueの行について、子フィールドの値を空文字列(この例でのクリアの扱い)に設定して代入しなおす。 df[clear_targets] = df[df[flg_field] == True][clear_targets].apply( lambda v: pd.Series(['' for s in clear_targets]), axis=1) """ プレクリーニング ======================== 要件上、不要な項目をこの時点で削除。欠損値の補完/項目削除。特定の値の場合は、欠損値扱いにして削除。 # (未実装) """ pass """ M2D処理 ==================== 表示名に変換 """ df_maps = J2J_CONFIG_DF.loc[J2J_CONFIG_DF['M2D'].str.len() > 0] for idx, row in df_maps.iterrows(): m = {i.split(':')[0]: i.split(':')[1] for i in row['M2D'].split(',')} fname = row['FN'] # 表示名を設定したフィールドを、「元のフィールド名+_M2D」に派生(代入) df[fname + '_M2D'] = df[fname].apply( lambda x: map2dispname(x, m)) """ VLS処理 ======================== JSONの配下のvalueを全てダンプする """ # 当該フィールド(dict型)のものについて、配下の全ての値のみ取得し、出力する df_dump_fields = J2J_CONFIG_DF.loc[J2J_CONFIG_DF['VLS'] == True]['FN'] for idx, val in df_dump_fields.iteritems(): fname = val df[fname + '_VLS'] = df[fname].apply(lambda x: all_values(x)) """ SH処理 ======================== 当該項目の内容を、コンフィグで指定された集約対象のフィールドにアペンドする。 情報源の上流システムがパッケージソフトなどによるRDBテーブルのカラム自動生成方式であり、入力画面では複数選択可能チェックボックスだが、選択肢ごとにRDBのテーブルの1カラムとなっているようなものを、検索時にキーワードでシンプルにヒットできるようにすることをイメージした変換。 ※言葉にするとややこしいので、シンプルバージョンのCNC処理を先に見た方が良い。 """ df_fields_into_one_array = J2J_CONFIG_DF.loc[J2J_CONFIG_DF['PSH'].str.len() > 0].groupby(['PSH'])[ 'FN'].apply(list).reset_index() for idx, row in df_fields_into_one_array.iterrows(): array_name = row['PSH'] fields = row['FN'] # ビジネスルール的視点ではアペンドだが、ここでは、対象フィールドの値の配列を作ってそれを代入 df[array_name] = df[fields].fillna('').values.tolist() """ CNC処理 ======================== これの処理対象の指定があるフィールドについて、指定項目の内容を、指定の(ここではCONCAT_FIELDという名前のDataFrameの列)にアペンドする。 実際は、前項のSH処理のシンプルバージョンである。 よって、SH処理でも代用できるが、CONCAT_FIELDというマジックフィールドをビジネスルール的に意識したいことから専用の論理にした。 """ # 所定の項目を全て取り込んだ文字列フィールド CONCAT_FIELD batch_concat_cols = J2J_CONFIG_DF.query('CNC == True')['FN'].tolist() batch_concat_cols = list(set(batch_concat_cols) & set(df.columns)) df['CONCAT_FIELD'] = df[batch_concat_cols].apply(series_to_arrfield, axis=1) """ # 緯度経度の形式変換 ====================== # 今回のストーリーとしてはそれほど意味をなさない例だが、df['xxx']への代入ではなく、DataFrame.assign()を使ってみた例。 # コンフィグによるものではなく、項目狙い撃ちのスペシャルな変換もあるよね...という例。 """ df = df.assign(xlon=df['lon'].apply(dtd)) df = df.assign(xlat=df['lat'].apply(dtd)) df = df.assign(location=df[['xlon', 'xlat']].values.tolist()) """ DL処理 ========================= # 不要なフィールドは削除する """ batch_del_cols = J2J_CONFIG_DF.query('DL == True')['FN'].tolist() batch_del_cols = list(set(batch_del_cols) & set(df.columns)) df = df.drop(labels=batch_del_cols, axis=1) # -------------ここから出力処理 --------------------- # JSON Lineで出力 df_lines = df.to_json( force_ascii=False, orient='records', lines=True) # 課題メモ: # この場合、JSONの最大プロパティ数が多いとnullや空の配列のデータが多く出力されるスパースなデータとなってしまう。 # Elasticsearchのバルクロード用の出力のJSON Lines # データをJSON Lines出力するだけなら、DataFrame.to_jsonで事足りるのだが、Elasticsearchのバルクロードは、アクション(正式名称は忘れた。POST先のインデックス名やCRUDのどれかなどを示すもの。)と対象レコードを1対とした繰り返し形式のため、ひとまずこんな感じで生で出力。 for i in iter(df_lines.split("\n")): v_json = json.loads(i) print('{"index": { "_id" : ' + str(v_json['id']) + '}}') print(i)
サンプルコードの実行結果
nullヌルだらけになってしまいましたが、これはまた。
{"index": { "_id" : 1}} {"a.b.c1":[111,222,333],"a.b.c2":111.0,"dumpobj":null,"id":1,"lat":"35.01.23.456","lon":"139.01.23.456","rel1":null,"rel2":null,"rel3":null,"x":"あああ","y":"いいい","z1":"ううう","z2":null,"a.b.c1_M2D":["NewYork","222","Texas"],"a.b.c2_M2D":"","dumpobj_VLS":"","newarray":["あああ","いいい","ううう",""],"CONCAT_FIELD":"いいい あああ ううう","xlon":139,"xlat":35,"location":[139,35]} {"index": { "_id" : 2}} {"a.b.c1":[333,444,555],"a.b.c2":null,"dumpobj":null,"id":2,"lat":"35.01.23.456","lon":"139.01.23.456","rel1":null,"rel2":null,"rel3":null,"x":"えええ","y":"おおお","z1":null,"z2":"かかか","a.b.c1_M2D":["Texas","Hawaii","555"],"a.b.c2_M2D":"","dumpobj_VLS":"","newarray":["えええ","おおお","","かかか"],"CONCAT_FIELD":"おおお かかか えええ ","xlon":139,"xlat":35,"location":[139,35]} {"index": { "_id" : 3}} {"a.b.c1":null,"a.b.c2":null,"dumpobj":null,"id":3,"lat":"35.01.23.456","lon":"139.01.23.456","rel1":true,"rel2":"","rel3":"","x":"えええ","y":null,"z1":null,"z2":null,"a.b.c1_M2D":"","a.b.c2_M2D":"","dumpobj_VLS":"","newarray":["えええ","","",""],"CONCAT_FIELD":" えええ ","xlon":139,"xlat":35,"location":[139,35]} {"index": { "_id" : 4}} {"a.b.c1":null,"a.b.c2":null,"dumpobj":null,"id":4,"lat":"35.01.23.456","lon":"139.01.23.456","rel1":true,"rel2":"","rel3":"","x":"えええ","y":null,"z1":null,"z2":null,"a.b.c1_M2D":"","a.b.c2_M2D":"","dumpobj_VLS":"","newarray":["えええ","","",""],"CONCAT_FIELD":" えええ ","xlon":139,"xlat":35,"location":[139,35]} {"index": { "_id" : 5}} {"a.b.c1":null,"a.b.c2":null,"dumpobj":[{"one":5,"two":6,"three":[7,8,9],"four":null,"five":[],"six":[{"a":"100"}]}],"id":5,"lat":"35.01.23.456","lon":"139.01.23.456","rel1":null,"rel2":null,"rel3":null,"x":null,"y":null,"z1":null,"z2":null,"a.b.c1_M2D":"","a.b.c2_M2D":"","dumpobj_VLS":",,5,6,,7,8,9,,,,,100","newarray":["","","",""],"CONCAT_FIELD":" ","xlon":139,"xlat":35,"location":[139,35]}
念のための免責事項
この記事は、実質pandasとElasticsearchの勉強メモです。 だれかの参考になればということで、○○風のデータがあれば、それをpandasでこんな加工をして、こんな感じで取り込めば、こういう活用ができるよねという技術メモをフリーハンドで記載したものです。
また、pandasやElasticsearchの各種コード例ですが、ひとまず私自身も含めて初学者の方がきっかけをつかむための、見てのとおりの完成度です。 言うまでもなく、そのままプロダクトに取り込めるレベルではないですし、この先は、できるだけ公式の情報源をあたっていただくとか、例えば、コーディングスタイルもpythonicなコードになるように磨き上げて抱く前提です。 (つまるところ、この記事の例は、そのレベルのものではないですよという話、言い訳です。念のため。)
*1:もともと位置づけからすると日曜大工ブログなので全体的に言いっ放しなのは宿命ですが...