はてだBlog(仮称)

私的なブログど真ん中のつもりでしたが、気づけばWebサイト系のアプリケーション開発周りで感じたこと寄りの自分メモなどをつれづれ述べています。2020年6月現在、Elasticsearch、pandas、CMSなどに関する話題が多めです。...ですが、だんだんとより私的なプログラムのスニペット置き場になりつつあります。ブログで述べている内容は所属組織で販売している製品などに関するものではなく、また所属する組織の見解を代表するものではありません。

続:Elasticsearch のバルクロード用JSON Lines ファイルをselectする toy スクリプト(ソートなど追加)

概要

この記事は次の記事の続きです。Elasticsearchのバルクロード用のJSON LinesファイルをイメージしたJSONの簡易フィルターコマンド相当のPythonでのツール例です。

経緯は下記の記事のとおりです。いわゆる拙作ではありますが、前回記事を書いたのちに自分の中で意外に便利な気がしたので、他にもバリエーションを追加してみました。

itdepends.hateblo.jp

インプットファイルは、次のリンク先の形式をイメージしています。以降コマンド例が出てきますが、json.jsonlinesというファイル名で保存されている想定です。

www.elastic.co

ツール例

以下ツール例です。

全体的に、

{"a":{"b":{"c":100}}}

というJSONがあれば、

cのフィールドに対して

「a.b.c」

のドット表記でフィールドを指定できるようなインタフェースになっています。

1. 命名 jsonlineselect (フィールド値が所定のものをselect)

def getobj(jsonstr, keystr, force_str=True):
    import json
    wrk = json.loads(jsonstr)
    for i in keystr.split('.'):
            if wrk.get(i):
                wrk = wrk[i]
            else:
                wrk = '' #ここはこのツールの限界になりうるがひとまずこの挙動としておく
                break             
    return str(wrk) if force_str else wrk

if __name__ == "__main__":
    import sys
    import re

    """
    cat json.jsonlines | python3 thisapp.py either index._id '^1'  first '^jo.*'
    """

    mode = sys.argv[1] #both/either/each
    
    ii = sys.argv[2]
    i_re = re.compile(sys.argv[3])
    
    jj = sys.argv[4]
    j_re = re.compile(sys.argv[5])
    
    jsons = []
    for l in sys.stdin:
        jsons.append(l.rstrip('\n'))
    
    for i, j in zip(jsons[0::2], jsons[1::2]):
        cond1 = re.findall(i_re, getobj(i,ii))
        cond2 = re.findall(j_re, getobj(j,jj))
        if (mode == 'both' and (cond1 and cond2)) or \
            (mode == 'either' and (cond1 or cond2) ):
            print(i)
            print(j)
        elif mode == 'each':
            if cond1: print(i)
            if cond2: print(j)
        else:
            PREFIX = 'STDERR\t'
            print(PREFIX + i,file=sys.stderr)
            print(PREFIX + j,file=sys.stderr)

走行例

$ cat json.jsonlines | python3 jsonlineselect.py  both index._id '^1'  first '^jo.*'
{"index":{"_id":1}}
{"first":"johnny","last":"日本語gaudreau","goals":[9,27,1],"assists":[17,46,0],"gp":[26,82,1],"born":"1993/08/13"}
STDERR  {"index":{"_id":2}}
STDERR  {"first":"sean","last":"monohan","goals":[7,54,26],"assists":[11,26,13],"gp":[26,82,82],"born":"1994/10/12"}
STDERR  {"index":{"_id":3}}
STDERR  {"first":"jiri","last":"hudler","goals":[5,34,36],"assists":[11,62,42],"gp":[24,80,79],"born":"1984/01/04"}
STDERR  {"index":{"_id":4}}
STDERR  {"first":"micheal","last":"frolik","goals":[4,6,15],"assists":[8,23,15],"gp":[26,82,82],"born":"1988/02/17"}
STDERR  {"index":{"_id":5}}
STDERR  {"first":"sam","last":"bennett","goals":[5,0,0],"assists":[8,1,0],"gp":[26,1,0],"born":"1996/06/20"}
STDERR  {"index":{"_id":6}}
STDERR  {"first":"dennis","last":"wideman","goals":[0,26,15],"assists":[11,30,24],"gp":[26,81,82],"born":"1983/03/20"}
STDERR  {"index":{"_id":7}}
STDERR  {"first":"david","last":"jones","goals":[7,19,5],"assists":[3,17,4],"gp":[26,45,34],"born":"1984/08/10"}
STDERR  {"index":{"_id":8}}
STDERR  {"first":"tj","last":"brodie","goals":[2,14,7],"assists":[8,42,30],"gp":[26,82,82],"born":"1990/06/07"}
STDERR  {"index":{"_id":39}}
STDERR  {"first":"mark","last":"giordano","goals":[6,30,15],"assists":[3,30,24],"gp":[26,60,63],"born":"1983/10/03"}
STDERR  {"index":{"_id":10}}
STDERR  {"first":"mikael","last":"backlund","goals":[3,15,13],"assists":[6,24,18],"gp":[26,82,82],"born":"1989/03/17"}
{"index":{"_id":11}}
{"first":"joe","last":"colborne","goals":[3,18,13],"assists":[6,20,24],"gp":[26,67,82],"born":"1990/01/30"}

2. 命名 jsonlineselect_sort (ソート用のフィールド名を指定してその順序にバルクロードファイルを並べる)

JSON Linesのアクション行(action)とペアとなるドキュメント行(doc)をひとかたまりに並べ替えます。

import itertools
import jsonlineselect
import sys
import json

"""
    cat json.jsonlines | python3 thisapp.py a_d index._id first 
"""

sorttype = sys.argv[1] #action/doc/a_d/d_a
    
ii = sys.argv[2]
jj = sys.argv[3]
    
jsons = []
for l in sys.stdin:
    jsons.append(l.rstrip('\n'))

c = itertools.count(1)
records = []
for i, j in zip(jsons[0::2], jsons[1::2]):
    funcmap = {
        'action': lambda a,d: a,
        'doc': lambda a,d: d,
        'a_d': lambda a,d: a + '___' +  d,
        'd_a': lambda a,d: d + '___' +  a
    }
    a = jsonlineselect.getobj(i,ii)
    d = jsonlineselect.getobj(j,jj)
    records.append({'sortkeyval': funcmap[sorttype](a, d),'n': next(c),'i': i, 'j': j}) 
    
for r in sorted(records, key=lambda r: r['sortkeyval']):
    print(r['i'])
    print(r['j'])

走行例

$ cat json.jsonlines | python3 jsonlineselect_sort.py doc index._id  first
{"index":{"_id":7}}
{"first":"david","last":"jones","goals":[7,19,5],"assists":[3,17,4],"gp":[26,45,34],"born":"1984/08/10"}
{"index":{"_id":6}}
{"first":"dennis","last":"wideman","goals":[0,26,15],"assists":[11,30,24],"gp":[26,81,82],"born":"1983/03/20"}
{"index":{"_id":3}}
{"first":"jiri","last":"hudler","goals":[5,34,36],"assists":[11,62,42],"gp":[24,80,79],"born":"1984/01/04"}
{"index":{"_id":11}}
{"first":"joe","last":"colborne","goals":[3,18,13],"assists":[6,20,24],"gp":[26,67,82],"born":"1990/01/30"}
{"index":{"_id":1}}
{"first":"johnny","last":"日本語gaudreau","goals":[9,27,1],"assists":[17,46,0],"gp":[26,82,1],"born":"1993/08/13"}
{"index":{"_id":39}}
{"first":"mark","last":"giordano","goals":[6,30,15],"assists":[3,30,24],"gp":[26,60,63],"born":"1983/10/03"}
{"index":{"_id":4}}
{"first":"micheal","last":"frolik","goals":[4,6,15],"assists":[8,23,15],"gp":[26,82,82],"born":"1988/02/17"}
{"index":{"_id":10}}
{"first":"mikael","last":"backlund","goals":[3,15,13],"assists":[6,24,18],"gp":[26,82,82],"born":"1989/03/17"}
{"index":{"_id":5}}
{"first":"sam","last":"bennett","goals":[5,0,0],"assists":[8,1,0],"gp":[26,1,0],"born":"1996/06/20"}
{"index":{"_id":2}}
{"first":"sean","last":"monohan","goals":[7,54,26],"assists":[11,26,13],"gp":[26,82,82],"born":"1994/10/12"}
{"index":{"_id":8}}
{"first":"tj","last":"brodie","goals":[2,14,7],"assists":[8,42,30],"gp":[26,82,82],"born":"1990/06/07"}

3. 命名 jsonlineselect_fieldselect(各行のうち指定のフィールド名の値のみ抜き出します)

これは前2つと違いペアリングはさほど意識していません。

def select_fields(jsonstr, fields):
    import os
    import jsonlineselect
    import json
    wrk = {}
    for f in fields:
        tmp = {}
        if _val := jsonlineselect.getobj(jsonstr, f, force_str=False):
            tmp = _val
            for i in reversed(f.split('.')):
                _d = {}
                _d[i] = tmp
                tmp = _d
        wrk = dict(**wrk,**tmp) 

    sort_keys = False
    _jsk = os.environ.get('JSON_SORT_KEYS')
    if _jsk == 'True':
        sort_keys = True
    return json.dumps(wrk,ensure_ascii=False,sort_keys=sort_keys)

if __name__ == "__main__":
    """
    cat json.jsonlines | python3 thisapp.py index._id  first goals
    """
    import sys

    fields = [ i for i in sys.argv[1:]]
        
    jsons = []
    for l in sys.stdin:
        jsons.append(l.rstrip('\n'))
    
    for i in jsons:
        print(select_fields(i,fields))
        

走行例 (エラーチェックはしていないので、何かあればすぐ自爆します。引数の指定の仕方に注意が必要ですが、複数指定可能です。)

$ cat json.jsonlines | python3 jsonlineselect_fieldselect.py index._id  first goals
{"index": {"_id": 1}}
{"first": "johnny", "goals": [9, 27, 1]}
{"index": {"_id": 2}}
{"first": "sean", "goals": [7, 54, 26]}
{"index": {"_id": 3}}
{"first": "jiri", "goals": [5, 34, 36]}
{"index": {"_id": 4}}
{"first": "micheal", "goals": [4, 6, 15]}
{"index": {"_id": 5}}
{"first": "sam", "goals": [5, 0, 0]}
{"index": {"_id": 6}}
{"first": "dennis", "goals": [0, 26, 15]}
{"index": {"_id": 7}}
{"first": "david", "goals": [7, 19, 5]}
{"index": {"_id": 8}}
{"first": "tj", "goals": [2, 14, 7]}
{"index": {"_id": 39}}
{"first": "mark", "goals": [6, 30, 15]}
{"index": {"_id": 10}}
{"first": "mikael", "goals": [3, 15, 13]}
{"index": {"_id": 11}}
{"first": "joe", "goals": [3, 18, 13]}

標準入力を受け取って、標準出力に流すので、複数パイプすることもできます。