はてだBlog(仮称)

私的なブログど真ん中のつもりでしたが、気づけばWebサイト系のアプリケーション開発周りで感じたこと寄りの自分メモなどをつれづれ述べています。2020年6月現在、Elasticsearch、pandas、CMSなどに関する話題が多めです。...ですが、だんだんとより私的なプログラムのスニペット置き場になりつつあります。ブログで述べている内容は所属組織で販売している製品などに関するものではなく、また所属する組織の見解を代表するものではありません。

Pythonのconcurrent.futuresのシンタックスに入門してみた(並列タスク実行)

はじめに

以前、PythonのIO関連の非同期処理をうまくコントロールして全体スループットをあげるということで、次の記事を投稿しました。

itdepends.hateblo.jp

今回はもっとストレートに、concurrent.futuresという並列タスク実行の標準パッケージがあるようなので、こちらに入門してみた...というメモです。

concurrent.futures 公式サイトと要点

docs.python.org

公式サイトをそのつもりで読めば分かるのですが*1、プロセス版とスレッド版があります。

どちらも、concurrent.futures.Executor という抽象クラスで定義されたインタフェースで統一的に使えますので、実のところ、データ並列性、タスク並列性の高い処理をさせるという範囲であれば、公式等のサンプルコードを見よう見まねで実行してみると、こういうシンタックスのモデルなのねというのがすぐに分かりますし、使い始められそうです。逆にいうと、実際に実行してみた方が良いですね。

特に、メソッドとして覚えておいた方が良いのが、Futureを返す、submitと、計算結果として得られた「何か」のイテレータを戻すmapメソッドでしょうか。

どちらも、並列実行させたい「関数」と引数をパラメータに取ります。

concurrent.futuresに慣れるためのサンプルコード1

import concurrent.futures

NUMS = range(5)


def pow(n):
    return n * n


def conc_map(executor):
    with executor() as e:
        it = e.map(pow, NUMS)
        for i in it:
            print(i)


def conc_submit(executor):
    with executor() as e:
        fts = [e.submit(pow, i) for i in NUMS]
        for i in fts:
            print(i.result())


if __name__ == '__main__':
    P = concurrent.futures.ProcessPoolExecutor
    T = concurrent.futures.ThreadPoolExecutor
    # practice1
    conc_submit(P)
    conc_submit(T)
    conc_map(P)
    conc_map(T)

実行結果

以下、実行結果です。独立した軽い演算を多重化しているだけなので、並列・並行・非同期になっているかは分かりづらいですが...

なお、Future、イテレータに戻り値を預けるスタイルなので、前者は「Future.result()」で戻り値を待ち合わせ取得できますし、後者は「next」でイテレーションできますので、戻り値を使いたいタイミングで、イテレータを回すことで良いようです。

0
1
4
9
16
0
1
4
9
16
0
1
4
9
16
0
1
4
9
16

ここで、

concurrent.futures.ProcessPoolExecutor

concurrent.futures.ThreadPoolExecutor

がそれぞれ名前のとおりのExecutorなのですが、この程度の例だと違いは見えてきませんね。一方で、プログラミングスタイルというか、シンタックスとしては、両者は統一的なやり方で取り回せることが見て取れます。

なお、asyncio含めて、concurrent.futuresのプロセス版、スレッド版の兄弟の使いどころから見た使い分けは、次の書籍で簡潔に述べられていますので、頭の整理に参考にさせていただきました*2

Python実践入門 ── 言語の力を引き出し、開発効率を高める WEB+DB PRESS plus

concurrent.futuresに慣れるためのサンプルコード2

サンプルコードその2です。

次のポイントを確かめたくて、「map」に絞りましたが、少し例を複雑にしていました。 (最後に待ち合わせはあるものの独立性の高い処理を多重化しているので、そんなに難しくはありません。)

1) 複数の引数を取る関数を多重化したい場合の表現方法 2) 多重度の制御(max_workersパラメータ)

import urllib.request
import concurrent.futures

def load_url(url, second_arg):
    print(second_arg)
    with urllib.request.urlopen(url) as conn:
        return conn.read()


def conc_practice_map(task_num, executor, max_workers):
    TASK_NUM = task_num
    URLS = ['http://localhost:8081/'] * TASK_NUM
    bar = ['dummy_' + str(i) for i in range(TASK_NUM)]
    with executor(max_workers=max_workers) as e:
        it = e.map(load_url, URLS, bar)
        for i in it:
            print(i)


if __name__ == '__main__':
    P = concurrent.futures.ProcessPoolExecutor
    T = concurrent.futures.ThreadPoolExecutor
    # practice2
    TN = 5
    MW = 3
    conc_practice_map(TN, P, MW)

http://localhost:8081/」にHTTPアクセスする(それ自体はほとんど意味がない)プログラムです。 手元の環境では、http://localhost:8081/ では、とにかく5秒経過してから、意味のないJSONを応答するサーバを起動しています。

このエンドポイントに、変数TNで定義される回数のアクセスを、最大同時実行数を変数MWで指定された値として、多重度をコントロールしながらも、並列実行するという想定のものになっています。

結果は割愛しますが、TNやMWの値を変えたりして、挙動や待ち合わせ具合を確認することができます。

なお、TN=5、MW=3 で実験したところ、走行時刻をtimeコマンドで計測するとおおよそ10秒強でした、3並列で2巡(1巡あたりサーバ側の5秒縛り)なので、おおよそ10秒という計算に合いそうです。

この項以上

追伸

上記の「 http://localhost:8081/」 では、とにかく5秒経過してから、意味のないJSONを応答するサーバ は次の記事で遊んでみたものを利用しています。

itdepends.hateblo.jp

*1:私は腹落ちするまで時間を要しましたが...

*2:...というところで、この記事ではそこにはふれません。スレッド版は有名(?)な、「GIL」の話がからんでくるようです。