はてだBlog(仮称)

私的なブログど真ん中のつもりでしたが、気づけばWebサイト系のアプリケーション開発周りで感じたこと寄りの自分メモなどをつれづれ述べています。2020年6月現在、Elasticsearch、pandas、CMSなどに関する話題が多めです。...ですが、だんだんとより私的なプログラムのスニペット置き場になりつつあります。ブログで述べている内容は所属組織で販売している製品などに関するものではなく、また所属する組織の見解を代表するものではありません。

JavaScriptでもmapやreduce、filter(手習いメモ)

今更ですが、JavaScript(私がいるのはTypeScriptでもなくて、やっとES6ぐらいに合流した世界線です)にもmapやreduceがあることを知りまして、世間からの何周遅れを追いつきたく、手習いしてみた...という自分メモです。

個人的なmapやreduceあるいはこれらと相性の良い便利な関数の使い所を自分の中で再確認してみたものを、特に深く考えずに詰め込みしています。

無駄に卑下するものではありませんが、特別なことは書いてない私的なメモでございます。

ただ、JavaScriptのmapやreduceやこれらの眷属の関数は、なんと配列の要素はもちろん、現在の要素の添字や配列全体を取得できるというこれまた便利な味付けがされているので、多少、添字が使えると、伝統的なmapよりも、ちょっとしたことができるな〜と思った例に多少は寄せてあるつもりです。

組み合わせ(ぽいやつ)

組み合わせを作る

/* 複数の配列を与えて、各配列の要素の全ての組み合わせの「組合せ」を生成する */

// サブ関数1
// [1,2,3] と 4 -> [1,4],[2,4],[3,4]
const tuples = (xa, y) => xa.map(x => [x, y].flat());

// サブ関数2
// [1,2,3] と [4,5,..] -> [1,4],[1,5],...,[2,4],[2,5]
const tupless = (xa, ya) => ya.map(y => tuples(xa, y)).flat();

// メイン関数
// [1,2,...][4,5,..][8,9,...] -> [1,4,8],[1,5,8],...
const get_comb = (a_a) => a_a.reduce((xarr, yarr) => tupless(xarr, yarr));

// サンプルデータを食わせる〜出力して確認

const a = ['1', '2', '3'];
const b = ['4', '5', '6'];
const c = ['8', '9', '10', '11', '12'];
const a_a = [a, b, c];

const comb = get_comb(a_a);

comb.forEach(i => {
    console.log(i.join('---'));
});

組み合わせを間引く

/* 組合せから、特定の組合せのパターンを間引く */

const same = (a, ba) => ba.map(b => JSON.stringify(b) === JSON.stringify(a)).some(Boolean);
const combf = comb.filter(v => (!same(v, [['1', '4', '8'], ['2', '4', '8']])));
// 注:combは先述の例参照

combf.forEach(i => {
    console.log(i.join('---'));
});

ネストされたJSONデータ

/* Elasticsearchのaggs(2階層型)の戻り値(風のデータ)から、組み合わせのタプルを抜き出す */
// ↓ livedoorのレストランデータ由来のデータ
const aggs = {
    "a1": {
        "doc_count_error_upper_bound": 0,
        "sum_other_doc_count": 0,
        "buckets": [
            {
                "key": "13__東京都",
                "doc_count": 64565,
                "a2": {
                    "doc_count_error_upper_bound": 649,
                    "sum_other_doc_count": 59946,
                    "buckets": [
                        {
                            "key": "和食",
                            "doc_count": 23243
                        },
                        {
                            "key": "居酒屋・ダイニングバー",
                            "doc_count": 13153
                        },
                        {
                            "key": "西洋料理",
                            "doc_count": 10980
                        }
                    ]
                }
            }
        ]
    }
}

const bk = aggs.a1.buckets;

const shapedBucketAsMap = (bk, af) => bk.reduce((a, c) => { a[c.key] = c[af].buckets; return a; }, {});
let buckinfo = shapedBucketAsMap(bk, 'a2');
console.log(buckinfo);

const nestbuc2Tuple = (bk, af) => bk.map(v1 => v1[af].buckets.map(v2 => [v1.key, v2.key]));
buckinfo = nestbuc2Tuple(bk, 'a2').flat();
console.log(buckinfo);

↓ 出力イメージ

> console.log(buckinfo);

{
  '13__東京都': [
    { key: '和食', doc_count: 23243 },
    { key: '居酒屋・ダイニングバー', doc_count: 13153 },
    { key: '西洋料理', doc_count: 10980 }
  ]
}
undefined
> 

> const nestbuc2Tuple = (bk, af) => bk.map(v1 => v1[af].buckets.map(v2 => [v1.key, v2.key]));
undefined
> buckinfo = nestbuc2Tuple(bk, 'a2').flat();
[
  [ '13__東京都', '和食' ],
  [ '13__東京都', '居酒屋・ダイニングバー' ],
  [ '13__東京都', '西洋料理' ]
]
> console.log(buckinfo);
[
  [ '13__東京都', '和食' ],
  [ '13__東京都', '居酒屋・ダイニングバー' ],
  [ '13__東京都', '西洋料理' ]
]

GroupBy

/* オブジェクトの配列を各オブジェクトのあるプロパティでGroupby */
const group = (objArr, getKeyFunc) =>
    objArr.reduce((grp, o, i, s) => {
        const gname = getKeyFunc(o, i, s);
        (grp[gname] || (grp[gname] = [])).push(o);
        return grp;
    }, Object.create(null));

const data = [
    { id: '111', name: 'A', p: 100 },
    { id: '222', name: 'B', p: 50 },
    { id: '333', name: 'C', p: 100 },
    { id: '444', name: 'C', p: 50 },
    { id: '555', name: 'D', p: 70 },
];

const getfn = (o, i, s) => 'グループ_' + o.name;
console.log(group(data, getfn));

↓ 出力イメージ

 {
  'グループ_A': [ { id: '111', name: 'A', p: 100 } ],
  'グループ_B': [ { id: '222', name: 'B', p: 50 } ],
  'グループ_C': [ { id: '333', name: 'C', p: 100 }, { id: '444', name: 'C', p: 50 } ],
  'グループ_D': [ { id: '555', name: 'D', p: 70 } ]
}

配列のランダム化

/* 配列の並びのランダム化 */
// ただし、Math.randomそのもののエントロピーはここでは十分なものとみなしてください。
const randomize = arr_ => {
    const arr = [...arr_];
    return arr_.map(() => arr.splice(Math.floor(Math.random() * arr.length), 1)[0]);
};

> randomize([1,2,3,4,5,6])
[ 6, 3, 2, 4, 5, 1 ]
> randomize([1,2,3,4,5,6])
[ 6, 3, 4, 1, 5, 2 ]
> randomize([1,2,3,4,5,6])
[ 1, 6, 2, 5, 3, 4 ]
> randomize([1,2,3,4,5,6])
[ 3, 1, 6, 5, 2, 4 ]
> randomize([1,2,3,4,5,6])
[ 4, 6, 1, 2, 5, 3 ]
> 

Trie風のデータ

/* trie 風のデータ構造を元の文字にに復元する */
// {'a':['b','bc','bcb','d'],..} --> [['ab','abc','abcb','ad'],...]
function decode_trie1(a) {
    return Object.keys(a).map(k => a[k].map(item => `${k}${item}`));
}

decode_trie1({ 'a': ['b', 'bc', 'bcb', 'd'], 'c': ['d'] })

/* trie 風のデータ構造を元の文字にに復元するその2 */
// {'a':['b','bc','bcb','d'],..} --> {a:['ab','abc','abcb','ad'], ...}
function decode_trie2(a) {
    return Object.keys(a).reduce((acm, c) => {
        acm[c] = a[c].map(item => `${c}${item}`);
        return acm;
    }, Object.create(null));
}

decode_trie2({ 'a': ['b', 'bc', 'bcb', 'd'], 'c': ['d'] })

配列のチャンク分割

/* 配列のチャンク分割 */
const chunksize = 3;
const renban = [...Array(chunksize * 10)].map((_, i) => i); // mapに引数
const chunk = (arr, size) => {
    return arr.reduce((a, c, i) => {
        (i % size === 0)
            ? a.push([c])
            : a[a.length - 1].push(c);
        return a;
    }, []);
};

chunk(renban, chunksize);

> chunk(renban, chunksize);
[
  [ 0, 1, 2 ],
  [ 3, 4, 5 ],
  [ 6, 7, 8 ],
  [ 9, 10, 11 ],
  [ 12, 13, 14 ],
  [ 15, 16, 17 ],
  [ 18, 19, 20 ],
  [ 21, 22, 23 ],
  [ 24, 25, 26 ],
  [ 27, 28, 29 ]
]
> 

配列の要素のユニーク化(setだと難しいもの)


/* 順番を維持しつつユニーク: 最初に現れたものを残す (srcとiがあるので便利) */
[1, 3, 1, 2, 4, 5, 1, 6, 5, 6, 8, 9, 1].filter((el, i, src) => src.indexOf(el) === i);

/* 最後残しユニーク(srcとiがあるので便利) */
[1, 3, 1, 2, 4, 5, 1, 6, 5, 6, 8, 9, 1].filter((el, i, src) => src.indexOf(el, i + 1) === -1);

/* 最後残しユニークをreverseを使って実現 */
[1, 3, 1, 2, 4, 5, 1, 6, 5, 6, 8, 9, 1].reverse().filter((el, i, src) => src.indexOf(el) === i).reverse();

/* reverseを独自実装(reduceがシンプルかも) ※言語の標準は破壊的だが、これはそうではない */
// https://developer.mozilla.org/ja/docs/Web/JavaScript/Reference/Global_Objects/Array/reverse
[1, 3, 1, 2, 4, 5, 1, 6, 5, 6, 8, 9, 1].reduce((acm, c) => [c].concat(acm));

/* 重複しているものを残す */
[1, 3, 1, 2, 4, 5, 1, 6, 5, 6, 8, 9, 1].filter((el, i, src) =>
    src.slice(0, i ? i - 1 : 0).indexOf(el) > -1
    || src.slice(i + 1).indexOf(el) > -1
);


everyとsome

検査に便利ですよね(everyとsome(any))

Array.prototype.every() - JavaScript | MDN

// ウォーミングアップ
cond = v => v > 2;
[1, 2, 3, 4].every(cond);
[10, 20, 30, 40].every(cond);
[1, 2, 3, 4].some(cond);

/* ユニークになっているか(Setは使わない方法) */
[10, 11, 12, 13].every((v, i, src) => src.indexOf(v) === i);

/* 後ろのものほど、大きくなっているか。*/
const nums = [10, 11, 12, 13];
nums.every((v, i, src) => src.slice(i + 1).findIndex(el => el < v) === -1);
nums.every((v, i, src) => src.slice(i + 1, i + 2).findIndex(el => el < v) === -1);

// undefinedを意識的に使って良いルールのもとでは、JavaScriptだともっとはっきり記述できる
nums.every((v, i, src) => src[i + 1] === undefined ? true : src[i + 1] > v);

findIndexも添字や元の配列を関数内で使ってfindできる

/* findIndexが思ったよりいろんなことができる...かも */
// 自身より3大きい数値が自身の前に2連続現れるものをfind(この例だと、13が該当するため、3が得られる。)
[10, 16, 16, 13, 14, 15, 16].findIndex((el, i, src) => {
    const x = el + 3;
    const pre = i - 2;
    return pre >= 0 ? src.slice(i - 2, i).every(x_ => x_ == x) : false;
});

自分の中ではヒット作なので、もう少し例がたまったらまたやるかも...

参考リンク

developer.mozilla.org

developer.mozilla.org

github.com

qiita.com

HTTPとTCPの関係を状況証拠的に調べてみた(実際は、node.jsのメジャーなHTTPクライアントライブラリの挙動確認ぽいことをしてみた)話

TCPのKeepAliveと、HTTPの「Connection:Keep-Alive」、HTTP1.1のパイプライン、HTTP/2の「マルチストリーム?」、あとメジャーなブラウザは「コネクションを6本張るものが多い」っていう話がやっぱり分かってなかったので少し調べ始めたのですが、結果的には多少話を絞ってHTTP1.1のHTTPデーモンに見立てたElasticsearchにいくつかメジャーなHTTPクライアントライブラリで接続して見て様子を確認してみました... という自分メモです。

f:id:azotar:20210323222728p:plain

登場人物

node.js : v12.16.1

express.js: 4.17.1

Elasticsearch: 6.8

axios: 0.21.1

node-fetch: 2.0.2

Elastic社のElasticsearch JavaScript クライアント: 6.8.8

実験用プログラム(foo.js)

上記でいうところのnode.jsで稼働させる実験用プログラムです。

やっていることは単純なのですが、確認するための条件をとりはからうロジックと複数の試験パターンをぐるぐる回すためのロジックがフラットに現れてくることにご注意ください。

起動方法や実験内容

コマンドラインで次のように起動すると、node.jsのExpress.jsの常駐プロセスが起動します。

node    foo.js    ◆◆◆

◆◆◆: KA、KA_MS、NKA、それ以外の4種類が設定可能です。

KA: http.AgentのkeepAliveプロパティをtrue KA_MS:http.AgentのkeepAliveプロパティをtrue、同maxSocketsプロパティを指定(このコードでは、5を指定) NKA:http.AgentのkeepAliveプロパティをfalse それ以外:http.Agentを明示的に生成しない(各HTTPクライアントライブラリのデフォルトの挙動になることを期待)

起動したnode.jsサーバに対し、次でアクセスします。

http://localhost:3000/?cltype=●●&outer_or_inner=▲▲▲▲

●●: ax、ft、esのいずれかを指定します。それぞれaxios、node-fetch、Elastic社が公開しているElasticsearchのJavaScriptクライアントライブラリを利用して、Elasticsearchに複数回非同期アクセスします。

▲▲▲▲: inner か outerを指定します。前者の場合、●●のライブラリのクライアントインスタンスを、Express.jsの「getルーティング」内で初期化します。 後者の場合は、node.jsのこのアプリが起動する際に初期化して、実際にElasticsearchと検索クエリのやりとりをするのは「getルーティング」内としています。 後述のとおり、私が勝手に想定した挙動とは違う実験結果とはなったのですが、クライアントライブラリのインスタンスの初期化場所次第で、コネクションの使いまわされ具合がどのように変わるのだろうということを調べたく2パターン用意したものです。

なお、実際のところ、node-fetchは、インスタンスをnewする処理と実際にElasticsearchと検索クエリのやりとりをする部分を分ける方法が分からなかったので、「getルーティング」内で初期化と実際の「検索アクセス」を実行するパターンのみなので、innerとouterの指定による区別はありません。

確認方法の考え方は次のような感じです。

末端のクライアントからの1回目のアクセスを受けた、node.jsが、Elasticsearchへの複数回のアクセスを行う、このアクセスの前後で、当該マシン上のTCPコネクションの状況を前後比較し、コネクション数の増減などを見て、「コネクション」の使い回し状況を推し測ろうというものになっています。

実験結果

ということで、◆◆◆ や ●●、▲▲▲▲を変えてみながら、何が起きるか確認して、次のような結果に至っています。字が小さいですね...

f:id:azotar:20210323222813p:plain

3種で比べてみたものの、比較対象の3種のクライアントともに末端ではnode.js標準のhttp/httpsモジュールを利用しているようで、TCPコネクションの利用のされ方という意味では、初期化の際にうけとったhttp.Agentの「keepAlive」での方針に従う、ということでそれぞれ大きな違いがないというところでしょうか。 *1

あと、(私にとって意外なだけで、node.js/JavaScript界隈のシングルスレッドモデルなどもろもろの仕組みなどからするとそうでもないのかもしれませんが)意外なことに、AxiosとFetchについては上記の区分でいうところのグローバルとローカル(クエリパラメータだとinnerとouter)のどちらでクライアントのインスタンスを初期化しても、TCPコネクションの再利用のされぐあいは変わらないみたいです*2

実験方法・プログラムの補足

上の方でそれとなくふれましたが、3種ともに、(おそらく)http/httpsモジュールを使うところの裏返しですが、http.Agentというクラスを元にした通信用のコンフィグが通るように見えます。

nodejs.org

http.Agentの「keepAlive」オプションをtrueにして、httpモジュール(のおそらくrequestメソッド)が呼び出される形でHTTP通信を行うと、request送受信が終わった後も、TCPのコネクションを保持し、次回のHTTP通信に(新規にTCPセッションを開始するのではなく)このコネクションを使うという仕掛けになるようです。

なお、上記のプログラム例では、node.jsのサーバを3000番ポートでlistenさせています。 また、対向のElasticsearchは、9200番ポートで起動させています。

よって、netstat -n コマンドで9200番ポート関係のポートの状態を確認することで、コネクションの生き様がわかるでしょうという観測の仕方になっています。

tcp4       0      0  127.0.0.1.56105        127.0.0.1.9200         ESTABLISHED
tcp4       0      0  127.0.0.1.56106        127.0.0.1.9200         ESTABLISHED
tcp4       0      0  127.0.0.1.62108        127.0.0.1.9200         ESTABLISHED
tcp4       0      0  127.0.0.1.62109        127.0.0.1.9200         ESTABLISHED
...
tcp4       0      0  127.0.0.1.9200         127.0.0.1.56105        ESTABLISHED
tcp4       0      0  127.0.0.1.9200         127.0.0.1.56106        ESTABLISHED
tcp4       0      0  127.0.0.1.9200         127.0.0.1.62108        ESTABLISHED
tcp4       0      0  127.0.0.1.9200         127.0.0.1.62109        ESTABLISHED
tcp4       0      0  127.0.0.1.9200         127.0.0.1.62110        ESTABLISHED
...

※ node.jsプロセスは常駐するので、所定の試行の前後で、netstat -n のコマンド結果を見て、維持されているTCPセッションの状態を状況証拠的ですが確認できます。

TCPコネクションの有様の理解にあたって今回少し賢くなったこと

TCPコネクションについて、netstat で様子をみてみつつ、できるだけ理屈や仕組みも理解して解釈しようということで、新たに見知ったことなどをつらつらと。

1) UNIX系OSでは、 なんでもファイルで抽象化してくれているので、ソケット通信(この場合はTCPセッション)の1接続に対して、ファイルディスクリプタを1つ使う。 *3

2) なので、ファイルディスクリプタの様子をウォッチしていれば、見えてくるところもある。もちろん深く掘り下げるなら、netstatとかの方が良いでしょう。

3) HTTP1.1サーバに見立てた、Elasticsearch(以下Es)では、Esがオープンしている「ファイルディスクリプタ」、およびオープン可能な最大数が、次のAPIで確認できる。

www.elastic.co

今回、netstatを表示してみつつも、一応、kibanaでElasticsearchのアクティブなファイルディスクリプタ 数の増減も確認して、netstatの様子と呼応していることがみて取れました。

なお、最大オープン可能数は、Linux自体は、ulimit他で調整できますし、Elasticsearchとしては次の説明の設定で変更できます。

www.elastic.co

4) HTTP1.1 のコネクション制御モデル(Short-lived、Persistent、Pipeline)

developer.mozilla.org

これまで、ブラウザ界隈できくHTTP1.1の話かHTTP/2の話かはよくわからず、(HTTPはセッションごとにTCPコネクションを張るものの、そのオーバーヘッドが大きく、新しいHTTPではコネクションを再利用する云々...というところから)当初、HTTP1.1のPipeline(以下パイプライン)の様を見ることができるのかなと想像していましたが、先述のとおり、今回のサーバ間接続の実験モデルでは、どちらかと言えば、気ままにTCPコネクション数を増やして並行度をあげておき、以降はコネクションを貼り続けたままにするというアプローチに見える。... でした。

なお、ブラウザとサーバの間だけかもしれないが、「パイプライン」は失敗だったという評価らしいので、パイプライン関連のトラブルではという心配はしなくて良さそう。 (関連ワード HOL Blocking)

※「パイプライン」は思ったより使われていないということは、同じTCPコネクションを使うことになった複数のHTTPセッションについては、同じTCPコネクション上でシリアライズされるので、多分だが長めのタスクはいつもほどほどのボトルネックとして表に現れやすいので、下手にパイプライン化されて出たり出なかったりと覆いかぶされるよりは、比較的挙動が想像しやすい、対処すれば効果が出やすい、効果が出る期待があるので少し無理をしてみようという気にもなるな...と捉えられるなとも思ったりしました。

5)HTTP/2 で、ひとつのTCPセッション上で複数のHTTPのリクエストを同時に扱うために、「ストリーム」の考え方が導入されている。 HTTP1.1の「パイプライン」ではない! 別物。HTTP1.1より前のパケットには HTTP/2のパケット(フレーム)では、ストリーム識別用のStream Identifierというフィールドがある。

6) 時代はすでにHTTP/3か? ただ、LAN内であれば、太いコネクションを本数をほどほどに抑えてというアプローチでHTTP1.1ベースで掘り下げてチューニングするのもありかと思った。 ただ、「サーバレス」とか「コンテナ」みたいなマイクロサービスっぽいところだとサーバ間(サーバレスと言ったのに奇妙な言い方ですが...)でも、今までのブラウザとサーバのHTTPの使い方に似た形になってきそうな気もするので、頭を柔らかく保っておく必要がありそう。

*1:状況証拠的にそう見えるというものであり、printfトレースまではしていないので、勘違いしているかもしれません。

*2:どちらでも変わらなそうに見えたから気にしないというよりは、思わぬところで思わぬ飽和とかに繋がりそうなので、むしろ注意が必要そうという話。

*3:厳密には、ファイルディスクリプタを一つ消費して、ソケット通信を抽象化という関係の方が正しい捉え方かもしれませんが....

Pythonのconcurrent.futuresのシンタックスに入門してみた(並列タスク実行)

はじめに

以前、PythonのIO関連の非同期処理をうまくコントロールして全体スループットをあげるということで、次の記事を投稿しました。

itdepends.hateblo.jp

今回はもっとストレートに、concurrent.futuresという並列タスク実行の標準パッケージがあるようなので、こちらに入門してみた...というメモです。

concurrent.futures 公式サイトと要点

docs.python.org

公式サイトをそのつもりで読めば分かるのですが*1、プロセス版とスレッド版があります。

どちらも、concurrent.futures.Executor という抽象クラスで定義されたインタフェースで統一的に使えますので、実のところ、データ並列性、タスク並列性の高い処理をさせるという範囲であれば、公式等のサンプルコードを見よう見まねで実行してみると、こういうシンタックスのモデルなのねというのがすぐに分かりますし、使い始められそうです。逆にいうと、実際に実行してみた方が良いですね。

特に、メソッドとして覚えておいた方が良いのが、Futureを返す、submitと、計算結果として得られた「何か」のイテレータを戻すmapメソッドでしょうか。

どちらも、並列実行させたい「関数」と引数をパラメータに取ります。

concurrent.futuresに慣れるためのサンプルコード1

import concurrent.futures

NUMS = range(5)


def pow(n):
    return n * n


def conc_map(executor):
    with executor() as e:
        it = e.map(pow, NUMS)
        for i in it:
            print(i)


def conc_submit(executor):
    with executor() as e:
        fts = [e.submit(pow, i) for i in NUMS]
        for i in fts:
            print(i.result())


if __name__ == '__main__':
    P = concurrent.futures.ProcessPoolExecutor
    T = concurrent.futures.ThreadPoolExecutor
    # practice1
    conc_submit(P)
    conc_submit(T)
    conc_map(P)
    conc_map(T)

実行結果

以下、実行結果です。独立した軽い演算を多重化しているだけなので、並列・並行・非同期になっているかは分かりづらいですが...

なお、Future、イテレータに戻り値を預けるスタイルなので、前者は「Future.result()」で戻り値を待ち合わせ取得できますし、後者は「next」でイテレーションできますので、戻り値を使いたいタイミングで、イテレータを回すことで良いようです。

0
1
4
9
16
0
1
4
9
16
0
1
4
9
16
0
1
4
9
16

ここで、

concurrent.futures.ProcessPoolExecutor

concurrent.futures.ThreadPoolExecutor

がそれぞれ名前のとおりのExecutorなのですが、この程度の例だと違いは見えてきませんね。一方で、プログラミングスタイルというか、シンタックスとしては、両者は統一的なやり方で取り回せることが見て取れます。

なお、asyncio含めて、concurrent.futuresのプロセス版、スレッド版の兄弟の使いどころから見た使い分けは、次の書籍で簡潔に述べられていますので、頭の整理に参考にさせていただきました*2

Python実践入門 ── 言語の力を引き出し、開発効率を高める WEB+DB PRESS plus

concurrent.futuresに慣れるためのサンプルコード2

サンプルコードその2です。

次のポイントを確かめたくて、「map」に絞りましたが、少し例を複雑にしていました。 (最後に待ち合わせはあるものの独立性の高い処理を多重化しているので、そんなに難しくはありません。)

1) 複数の引数を取る関数を多重化したい場合の表現方法 2) 多重度の制御(max_workersパラメータ)

import urllib.request
import concurrent.futures

def load_url(url, second_arg):
    print(second_arg)
    with urllib.request.urlopen(url) as conn:
        return conn.read()


def conc_practice_map(task_num, executor, max_workers):
    TASK_NUM = task_num
    URLS = ['http://localhost:8081/'] * TASK_NUM
    bar = ['dummy_' + str(i) for i in range(TASK_NUM)]
    with executor(max_workers=max_workers) as e:
        it = e.map(load_url, URLS, bar)
        for i in it:
            print(i)


if __name__ == '__main__':
    P = concurrent.futures.ProcessPoolExecutor
    T = concurrent.futures.ThreadPoolExecutor
    # practice2
    TN = 5
    MW = 3
    conc_practice_map(TN, P, MW)

http://localhost:8081/」にHTTPアクセスする(それ自体はほとんど意味がない)プログラムです。 手元の環境では、http://localhost:8081/ では、とにかく5秒経過してから、意味のないJSONを応答するサーバを起動しています。

このエンドポイントに、変数TNで定義される回数のアクセスを、最大同時実行数を変数MWで指定された値として、多重度をコントロールしながらも、並列実行するという想定のものになっています。

結果は割愛しますが、TNやMWの値を変えたりして、挙動や待ち合わせ具合を確認することができます。

なお、TN=5、MW=3 で実験したところ、走行時刻をtimeコマンドで計測するとおおよそ10秒強でした、3並列で2巡(1巡あたりサーバ側の5秒縛り)なので、おおよそ10秒という計算に合いそうです。

この項以上

追伸

上記の「 http://localhost:8081/」 では、とにかく5秒経過してから、意味のないJSONを応答するサーバ は次の記事で遊んでみたものを利用しています。

itdepends.hateblo.jp

*1:私は腹落ちするまで時間を要しましたが...

*2:...というところで、この記事ではそこにはふれません。スレッド版は有名(?)な、「GIL」の話がからんでくるようです。

Elasticsearchのネストしたaggsの検索結果をflat形式で戻してくれるComposite aggregationのさわり

拙ブログに、「Elasticsearch aggs/aggregations flat」で検索して迷いこむ方がいらっしゃるようです。

flat」というところから、もしかして?、と思いまして、あらためて Elasticsearch のAggregationのひとつのCompositeに入門してみましたので、せっかくお越しいただいた方の参考になるかもということで、ご紹介します。

www.elastic.co

確認した、Elasticsearchのバージョンは6.8です。

なお、このブログでComposite以外のAggregationを試した他の記事はこちらです。

itdepends.hateblo.jp

itdepends.hateblo.jp

itdepends.hateblo.jp

Composite aggregationと他のaggsの違い(compositeの嬉しいこと:検索結果がflatな配列で戻る)

Composite aggregation以外の例を先にご紹介してから、対比でComposite aggregationのメリットをお伝えすることにします。

他の記事にも例示しましたが、ElasticsearchのAggregationは、バケットの組み合わせ・掛け合わせができます。

例えば、Compositeではない(おそらくもっとメジャーな普通の)Aggregationの例では、次のクエリで(もちろんインデックスの作り次第ですが)、 都道府県ごとの、該当カテゴリごとの飲食店数が得られます。

POST /飲食店インデックス/_search
{
  "aggs": {
    "bk1": {
      "terms": {
        "field": "都道府県",
        "size": 50
      },
      "aggs": {
        "bk2": {
          "terms": {
            "field": "飲食カテゴリ",
            "size": 100
          }
        }
      }
    }
  }
}

このクエリでは、検索結果の表現方法が、都道府県が親で飲食カテゴリが子のJSONとなります。

例えば、次のようなイメージです。

{
    bk1:{
        buckets:[
            {
              key:東京都,
              bk2:{
                buckets:[
                   {
                    key:ラーメン,
                    doc_count: 1000
                   },
                   {
                    key:フレンチ,
                    doc_count: 500
                   }
                ]
              }
            },
        ]
    }
}

ここで、好みの範囲なのですが、上記の例であれば、次のような戻し方をしてくれると嬉しいな〜と思いませんか。 このような戻り値の形式、ある種のflat/フラットな表現で結果が得られると嬉しいなというのが、「Composite Aggregations」です。

{
    buckets:[
       {
        key1:東京都,
        key2:ラーメン,
        doc_count: 1000
       },
       {
        key1:東京都,
        key2:フレンチ,
        doc_count: 500
       }
       ...
    ]
}

Composite aggregationクエリの例

ということで、「Composite aggregation」の例です。

「compostite」プロパティで制御します。また、ポイントは「sources」の中です。 ちなみに、terms以外も、ヒストグラムなどいくつかの、集計の方法が指定できます。

GET /インデックス名/_search
{
  "aggs": {
    "my_buckets": {
      "composite": {
        "sources": [
          { "prefecture": { "terms": { "field": "集計対象のフィールド_都道府県"} } },
          { "category": { "terms": { "field": "集計対象のフィールド_レストランカテゴリ" } } }
          /* ここに他にも条件を加えたければ追加可能 */
        ],
        "size": 10000
      }
    }
  }
}

結果の例

先のクエリの戻りは次のようになります。

トップレベルのbucketsの中で、都道府県と飲食カテゴリの組み合わせの配列が得られますね。

{
  "aggregations" : {
    "my_buckets" : {
      "buckets" : [
        {
          "key" : {
            "prefecture" : "東京都",
            "category" : "焼き鳥"
          },
          "doc_count" : 2181
        },
        {
          "key" : {
            "prefecture" : "東京都",
            "category" : "焼きとん",
           },
          "doc_count" : 18
        },
        {
          "key" : {
            "prefecture" : "東京都",
            "category" : "焼きそば・焼きうどん",
          },
          "doc_count" : 120
        },
        ...

}

注意事項など

Compositeに限らず、Aggregationは重い処理なので、性能などや、sizeの指定件数等はよろしくお気をつけください。

それ以外では、他のAggregationにぶら下げる無印の「terms」と次のようなところが異なるところです。

1) 取得件数は、compositeプロパティの直下のsizeで指定できます。

2) 他のaggsでは可能な、doc_count順のソートができません。

3) termsは、keyの昇順・降順の並び指定が、orderパラメータで指定できます。 繰り返しますが、doc_count順のソートはできません。

aggregationsの無印のtermsとは少し違いますね。

参考:無印のterms Terms aggregation | Elasticsearch Guide [7.13] | Elastic

4) 上記の戻り値イメージでは割愛したのですが、戻り値のトップフィールドに「buckets」と同じ並びに、「after_key」というものがあります。 「after_key」には、buckets配列の最後のエントリと同じものが入っています。一見不思議な項目なのですが、検索結果が多い場合のページネーションの目印となります。 具体的には、この値を同じ条件のaggs-compositeクエリの「after」フィールドに指定してやることで、それ以降のエントリを取得することができます。

戻り値をフラットな形式にしてくれるとともに、1)、2)、3)のような挙動から察するに、他のAggregationに比べて、全パターンを取得したい/取得せざるを得ない要件において、ページネーションしながら使って良いよ、という方針かなと思いました。

Pythonのopen関数での改行の扱い備忘録(newlineオプション、readlines、readline...)

Pythonのopen組み込み関数では、「改行」に関して「優しい」仕様になっています。

しかし、私のような日曜プログラマは半年に1回ぐらいやっちまうのですが、稀にこの気遣い仕様を忘れて、ハマってしまうこともあるので、ここに備忘録として記しておきます。

まず、open 関数と、改行の振る舞いを制御するnewline引数については次のとおりです。

https://docs.python.org/ja/3/library/functions.html#open

※独り言:Pythonの公式サイトのリファレンスは、比較的ググラビリティが低い感じがする。

open組み込み関数のnewlineオプションの動作確認

公式リファレンスにしっかり書いてあるのですが、いかんせん忘れがちな挙動があります。

まずは、確認用スクリプト

for nl in  [None,'\r\n','\n','\r','']:
    lines = open('kaigyo.txt','r',encoding='utf-8',newline=nl).readlines()
    print('')
    print(f'newline={repr(nl)}')
    print(lines)

newline=None (newlineを指定しない場合。つまりデフォルト。)

改行っぽいやつを改行とみなして扱う。readlines()では、その環境の改行(os.linesep)に置き換えられたものが得られる。 (「ユニバーサル改行モード」と呼ぶらしい)

たまにこれの存在を忘れていて、荒れた改行コードのファイルを解析する時に落とし穴にハマっってしまう。

newline='\n' または '\r\n' または '\r'

指定のもののみ改行とみなして、readlines()でリストとして読み込む。特に改行の標準化などは行わない(元の入力のまま)。

newline=''

改行っぽいやつを改行とみなして扱う。readlines()では、元の入力のままの値が得られる。

IPythonでの確認

In [19]: ! echo -e "a,b,c\nd,e,f\r\ng,h,i\rj,kl" > kaigyo.txt        

In [20]: ! od -x kaigyo.txt 
    ...:                                                                                                                                                                   
0000000      2c61    2c62    0a63    2c64    2c65    0d66    670a    682c
0000020      692c    6a0d    6b2c    0a6c                                
0000030

In [21]: for nl in  [None,'\r\n','\n','\r','']: 
    ...:     lines = open('kaigyo.txt','r',encoding='utf-8',newline=nl).readlines() 
    ...:     print(f'newline={repr(nl)}') 
    ...:     print(lines) 
    ...:                                                                                                                                                                   
newline=None
['a,b,c\n', 'd,e,f\n', 'g,h,i\n', 'j,kl\n']  
※ \r\n  \r単独 \n単独の全てが改行として扱われる。readlines()では、筆者の環境では「\n」に標準化。

newline='\r\n'
['a,b,c\nd,e,f\r\n', 'g,h,i\rj,kl\n']
※ \r\nが改行扱い

newline='\n'
['a,b,c\n', 'd,e,f\r\n', 'g,h,i\rj,kl\n']
※\nが改行扱い

newline='\r'
['a,b,c\nd,e,f\r', '\ng,h,i\r', 'j,kl\n']
※\rが改行扱い

newline=''
['a,b,c\n', 'd,e,f\r\n', 'g,h,i\r', 'j,kl\n']
※改行っぽいやつは改行扱いされ、かつ標準化はしない。

Pandasである一覧データから別のキー一覧指定のレコードのみ抽出(の処理時間傾向の雑な計測)

ある一覧データから別の一覧データのキーに存在するものを抽出するという要件があります。

f:id:azotar:20210201200945p:plain

Pandasで言うと、上図のように、2つのDataFrameをmerge(つまり表データのJOINの論法ですね)することで、このようなデータを取得するというのが私の手グセなのですが、結合したDataFrameで後続の処理を行うことも多いので、この方法に特に疑問を持っていませんでした。

今更なのですが、この処理方式って自分が思うよりコスト高な処理なのではとふと頭をよぎったので、今回、同じようなことができる複数のロジック表記例について、雑に時間計測してみました。

前準備

import pandas as pd
import numpy as np
import random

np.random.seed(seed=64)
random.seed(64)
lista = list(map(str, list(np.random.rand(1000000))))


df_orig = pd.DataFrame([[i, j]
                        for i, j in zip(lista, lista)], columns=['a', 'b'])

foo = df_orig.set_index('a').to_dict(orient='index')

df = df_orig.copy()

listb = random.sample(lista, int(len(lista)/3))
df2 = pd.DataFrame([[i, j] for i, j in zip(listb, listb)], columns=['a', 'b'])
bar = df2.set_index('a').to_dict(orient='index')

比較例一覧

def a(df2):
    # mergeでフィルターです。
    # 生成したDataFrameの二次利用もふまえるとそんなに悪くない(個人的にはスキな)イディオムだと思うのですが....
    df3 = pd.merge(df, df2, how='inner', on='a')
    df3['a']


def b(listb):
    # isinです。
    cond = df['a'].isin(listb)
    df[cond]


def c(listb):
    # これは比較用の例で、Pandasは使われていません。
    for i in listb:
        foo.get(i)


def c2(bar):
    # これは比較用の例で、Pandasは使われていません。
    for i in foo.keys():
        bar.get(i)


def d():
    # mergeではなく、applyを使う案のひとつです。
    # DataFrame全体をターゲットにしているため、これは遅いです。
    df.apply(lambda s: foo.get(s['a']), axis=1)


def d2():
    # d()の要件であれば、最初にSeriesを取り出した方が良いですね。
    df['a'].apply(lambda x: foo.get(x))


def e():
    # mergeではなく、縦に結合して、重複(=キーが重なる。つまり、対象のデータである)を抜き出すことで、該当の要件を実現します。
    # この例では、全ての項目が同じものがあぶり出されます(ので、オーバースペックというか、ここまでの要件をぴったり実現したものではありません)
    df3 = pd.concat([df, df2])
    df3[df3.duplicated()]


def e2():
    # e()の実際の例
    # 個人の感想ですが、「関数型プログラミング」ぽいアプローチかも。 
    df3 = pd.concat([df, df2])
    df3[df3.duplicated('a')]

計測結果

In [6]: %time a(df2) 
   ...:  
   ...: %time b(listb) 
   ...:  
   ...: %time c(listb) 
   ...:  
   ...: %time c2(bar) 
   ...:  
   ...: %time d() 
   ...:  
   ...: %time d2() 
   ...:  
   ...: %time e() 
   ...:  
   ...: %time e2()                               
                                                                                                                          
CPU times: user 693 ms, sys: 84.7 ms, total: 777 ms
Wall time: 777 ms

CPU times: user 422 ms, sys: 4.78 ms, total: 427 ms
Wall time: 428 ms

CPU times: user 174 ms, sys: 289 µs, total: 174 ms
Wall time: 174 ms

CPU times: user 251 ms, sys: 363 µs, total: 252 ms
Wall time: 252 ms

CPU times: user 10.4 s, sys: 16.4 ms, total: 10.4 s
Wall time: 10.5 s

CPU times: user 431 ms, sys: 2.29 ms, total: 433 ms
Wall time: 435 ms

CPU times: user 732 ms, sys: 48.4 ms, total: 780 ms
Wall time: 784 ms

CPU times: user 420 ms, sys: 11.8 ms, total: 432 ms
Wall time: 433 ms

In [7]:                                                                                                                                                                    

所感(mergeでフィルターを代用するのはそれほど悪くないかも)

データバリエーションや複数回計測していない状況なので、あくまで所感ですが、個人的なフェイバリットのmerge方式(関数aの例)は、生成したDataFrameの二次利用に続くならそれほど悪くないと思いました。

抽出自体のみで良い場合は、isin(リスト)方式(関数bの例)がコードが簡潔であることも含めて、優勝かな。

Pythonのhttp.serverを使ってWebサーバの擬似不具合を再現するテスト用モックのようなドライバのような何か

Webクライアント側の例外処理のテストやそもそも挙動確認等で、サーバ側に多少奇妙な挙動、例えばHTTPヘッダーのある1項目を規定外のものにする、といったことをさせたいことはありませんか。

ただし、残りの99%部分は普通に動いて(いるようにみせかけて)欲しいというやつです。

サーバ側も管理範囲なら一時的に変更することが可能でしょうがそこまでやるのは本末転倒な場合も多く、これまた各種事情によりそもそもサーバ側の改変も難しい場合もしばしばあるでしょう。

さすがにカオスエンジニアリングは大掛かりすぎる、docker等活用もそれはそれで小回りがきかない感じというシチュエーションがあると思います。

ということで、私が知らないだけでイケてる方法はいくつもあるでしょうが、言語や軽めのフレームワークでほぼ標準で持っている動的Webサーバのライブラリでミニチュアサーバを立てるのが実のところ手っ取り早いというのが個人的な実感です。

特に、Pythonでは、標準ライブラリにhttp.serverというものがあります。

docs.python.org

http.serverは、リファレンスで下記のようにうたわれているものです。

警告 http.server is not recommended for production. It only implements basic security checks.

下手に高度なフレームワークなどを使ってしまうと、なんらかの安全弁が作動して奇妙な動作をさせる手間が増えてしまうこともあるでしょう。むしろ、http.serverのこの位置付けは、HTTPDの常駐デーモンから作成しなくて良いものの、それ以上は何も足さない、何も減らさないというところが、冒頭にあげたような用途にはちょうど良いように感じています。

ということで、http.serverを使った

  1. HTTPクライアントからのアクセス時に、「サーバ起動時に指定された秒数」だけ待機する(クライアントを待たせる)。
  2. N秒後に規定のコンテンツテキストを戻す。
  3. GETメソッド、POSTメソッドに対応

という、サーバプログラムの例です。

シェルのコンソールで、

python プログラム名.py 8080 固定で応答するコンテンツファイル  5

とすると、8080ポートで起動します。 また、各アクセスの際の応答までの待機秒数は5秒となります。

http.serverを使って奇妙な挙動をするサーバの例(Pythonプログラム)

from http.server import HTTPServer, BaseHTTPRequestHandler, ThreadingHTTPServer
import time
import sys


class MyHandler(BaseHTTPRequestHandler):

    def do_GET(self):
        time.sleep(WAIT)
        self.send_response(200)
        self.send_header('Content-type', CTYPE)
        self.end_headers()
        self.wfile.write(DUMMY_RES.encode())

    def do_POST(self):
        time.sleep(WAIT)
        self.send_response(200)
        self.send_header('Content-type', CTYPE)
        self.end_headers()
        self.wfile.write(DUMMY_RES.encode())


def run(server):
    server(('localhost', LISTEN_PORT), MyHandler).serve_forever()


if __name__ == '__main__':
    LISTEN_PORT = int(sys.argv[1])
    CTYPE = 'application/json'
    DUMMY_RES = open(sys.argv[2], 'r', encoding='utf-8').read()
    WAIT = int(sys.argv[3])
    # run(HTTPServer)
    run(ThreadingHTTPServer)

上記のプログラム例の補足です。 末尾のrunという関数呼び出し箇所あたりに注目ください。

http.server.HTTPServer、http.server.ThreadingHTTPServerでいうと、後者を使っています。 http.server.HTTPServerはコメントアウトしている方で、シングルスレッドで動作する*1ので、複数クライアントからのアクセスの場合には、サーバ側の応答が直列化される、より奇妙な挙動を再現しやすくなります。

<<この記事おわり>>

ちなみに、http.serverについては、少し前に自分用まとめをしてします。

itdepends.hateblo.jp

*1:本来の話の流れでは、ThreadingHTTPServerがマルチスレッドで動作する高度なバージョンですが、今回は逆の視点で捉えます。