はじめに
以前、PythonのIO関連の非同期処理をうまくコントロールして全体スループットをあげるということで、次の記事を投稿しました。
今回はもっとストレートに、concurrent.futuresという並列タスク実行の標準パッケージがあるようなので、こちらに入門してみた...というメモです。
concurrent.futures 公式サイトと要点
公式サイトをそのつもりで読めば分かるのですが*1、プロセス版とスレッド版があります。
どちらも、concurrent.futures.Executor という抽象クラスで定義されたインタフェースで統一的に使えますので、実のところ、データ並列性、タスク並列性の高い処理をさせるという範囲であれば、公式等のサンプルコードを見よう見まねで実行してみると、こういうシンタックスのモデルなのねというのがすぐに分かりますし、使い始められそうです。逆にいうと、実際に実行してみた方が良いですね。
特に、メソッドとして覚えておいた方が良いのが、Futureを返す、submitと、計算結果として得られた「何か」のイテレータを戻すmapメソッドでしょうか。
どちらも、並列実行させたい「関数」と引数をパラメータに取ります。
concurrent.futuresに慣れるためのサンプルコード1
import concurrent.futures NUMS = range(5) def pow(n): return n * n def conc_map(executor): with executor() as e: it = e.map(pow, NUMS) for i in it: print(i) def conc_submit(executor): with executor() as e: fts = [e.submit(pow, i) for i in NUMS] for i in fts: print(i.result()) if __name__ == '__main__': P = concurrent.futures.ProcessPoolExecutor T = concurrent.futures.ThreadPoolExecutor # practice1 conc_submit(P) conc_submit(T) conc_map(P) conc_map(T)
実行結果
以下、実行結果です。独立した軽い演算を多重化しているだけなので、並列・並行・非同期になっているかは分かりづらいですが...
なお、Future、イテレータに戻り値を預けるスタイルなので、前者は「Future.result()」で戻り値を待ち合わせ取得できますし、後者は「next」でイテレーションできますので、戻り値を使いたいタイミングで、イテレータを回すことで良いようです。
0 1 4 9 16 0 1 4 9 16 0 1 4 9 16 0 1 4 9 16
ここで、
concurrent.futures.ProcessPoolExecutor
concurrent.futures.ThreadPoolExecutor
がそれぞれ名前のとおりのExecutorなのですが、この程度の例だと違いは見えてきませんね。一方で、プログラミングスタイルというか、シンタックスとしては、両者は統一的なやり方で取り回せることが見て取れます。
なお、asyncio含めて、concurrent.futuresのプロセス版、スレッド版の兄弟の使いどころから見た使い分けは、次の書籍で簡潔に述べられていますので、頭の整理に参考にさせていただきました*2。
concurrent.futuresに慣れるためのサンプルコード2
サンプルコードその2です。
次のポイントを確かめたくて、「map」に絞りましたが、少し例を複雑にしていました。 (最後に待ち合わせはあるものの独立性の高い処理を多重化しているので、そんなに難しくはありません。)
1) 複数の引数を取る関数を多重化したい場合の表現方法 2) 多重度の制御(max_workersパラメータ)
import urllib.request import concurrent.futures def load_url(url, second_arg): print(second_arg) with urllib.request.urlopen(url) as conn: return conn.read() def conc_practice_map(task_num, executor, max_workers): TASK_NUM = task_num URLS = ['http://localhost:8081/'] * TASK_NUM bar = ['dummy_' + str(i) for i in range(TASK_NUM)] with executor(max_workers=max_workers) as e: it = e.map(load_url, URLS, bar) for i in it: print(i) if __name__ == '__main__': P = concurrent.futures.ProcessPoolExecutor T = concurrent.futures.ThreadPoolExecutor # practice2 TN = 5 MW = 3 conc_practice_map(TN, P, MW)
「http://localhost:8081/」にHTTPアクセスする(それ自体はほとんど意味がない)プログラムです。 手元の環境では、http://localhost:8081/ では、とにかく5秒経過してから、意味のないJSONを応答するサーバを起動しています。
このエンドポイントに、変数TNで定義される回数のアクセスを、最大同時実行数を変数MWで指定された値として、多重度をコントロールしながらも、並列実行するという想定のものになっています。
結果は割愛しますが、TNやMWの値を変えたりして、挙動や待ち合わせ具合を確認することができます。
なお、TN=5、MW=3 で実験したところ、走行時刻をtimeコマンドで計測するとおおよそ10秒強でした、3並列で2巡(1巡あたりサーバ側の5秒縛り)なので、おおよそ10秒という計算に合いそうです。
この項以上
追伸
上記の「 http://localhost:8081/」 では、とにかく5秒経過してから、意味のないJSONを応答するサーバ は次の記事で遊んでみたものを利用しています。