はてだBlog(仮称)

私的なブログど真ん中のつもりでしたが、気づけばWebサイト系のアプリケーション開発周りで感じたこと寄りの自分メモなどをつれづれ述べています。2020年6月現在、Elasticsearch、pandas、CMSなどに関する話題が多めです。...ですが、だんだんとより私的なプログラムのスニペット置き場になりつつあります。ブログで述べている内容は所属組織で販売している製品などに関するものではなく、また所属する組織の見解を代表するものではありません。

最近のPythonでのコンカレント(並列)の非同期処理 async/await ・asyncioについて(ver 3.8で確認)

Pythonの非同期処理については、いろいろ歴史・経緯があるようです。

しかしながら、皆さんのおかげで、部外者が安易に言うのもはばかられますが、Python3.7あたりから、私のような日曜エンジニアでもおいしいところにありつけるようになってきた様子だったので、シンタックス入門例を自分用にまとめてみました。

docs.python.org

ちなみに、非同期処理が登場する要件としてはいろいろなケースがあると思いますが、本記事では、時間のかかるIO処理を並行化するユースケースを念頭においています。さらに、サーバとクライアントでいえばどちらかといえば「使う」側の後者における非同期処理に重心があります。

また、記事作成にあたり実際はいろいろネット上の文献等を参照させていただいてはいるのですが、どこのどれにポインタを貼るべきか悩ましいところでした。

よって、予備知識や背景を勉強させていただいたことには感謝しつつ、ここではあえて他の参考リンクはしませんでした。

なお、本記事の例はPython3.8で動作確認しています。

上記の公式リファレンスは自分なりに熟読しましたが、如何せん、非同期処理は、挙動を観測するためにトリックを入れるとそれの都合で確認したい挙動が変わってしまう(かのようにログ出力されることもある)ため、Python3.8で確認したことは胸をはって宣言できるのですが、Python自体のソースコードを追ったわけではありませんので、多少割り引いてご覧ください。また、エラーハンドリングは省略していますので、言うまでもないですが、自己責任でお願いします*1

(ex.1) コルーチンオブジェクトを戻す非同期関数・メソッドを同期的に処理する

まずは、柔道で立ち技の技術を活かすために寝技に通じる、ブレーキの技術があればアグレッシブな運転ができるというところで、あえて非同期処理を同期的に扱う例から入ります。

一例として、Pythonの非同期処理を取り扱うサードパーティ他のライブラリを使用することになりました、という例をイメージください。

さて、このサードパーティ関数・メソッド・APIは、コルーチンオブジェクトを戻します... とリファレンスには記載されています*2

Pythonにおけるコルーチン・コルーチンオブジェクトが何かは、ボロが出るので深追いしませんが、どうやらこの関数で何かデータを取得したり変換した結果そのものではなく、なんらかの代理のオブジェクトのようなものを戻してくれるらしいし、この存在を認めることで、どうやら非同期処理をプログラム記述上うまく取り扱えるんだろうなということは状況証拠的に分かります。

そのような想定をしつつ、関数に不慣れなこともあり、非同期処理ではありながらも同期的に処理することにして、それ以外の本来の目的に集中しましょうというプログラム設計アプローチをとることもあるよね...

という場合に、このサードパーティ関数の実際の戻り値を取得する、また、プログラムの進行を非同期処理が応答を戻すまで確実に待機するための魔法のコトバが、「await」です。

具体的なawaitの例は次のとおりです。

■awaitで外部の非同期ライブラリを待ち合わせる例(hoge1.py)

import asyncio

async def gaibuno_asyncfunc(x, y):
    """
    擬似非同期関数です。
    データベースなどを検索して(見ての通りしてませんが)、最終的には値を戻す関数に見立ててください。
 実際は、
    import  gaibuno_asyncfunc
    している体としてご覧ください。
    """
    print(F'引数{x},{y}で呼び出されました。')
    return x + y


async def main():
    cr = gaibuno_asyncfunc(3, 10)
    print(cr)
    #  ↓
    # <coroutine object echo at 0x7ff420035640 >

    result = await cr
    print(result)
    #  ↓
    #  13


asyncio.run(main())

hoge1.pyの実行結果

<coroutine object echo at 0x7f8e38105640>
引数3,10で呼び出されました。
13

gaibuno_asyncfuncは、今回のストーリーにおける、サードパーティの「コルーチンオブジェクト」を戻すと言われている関数だと思ってください。コードの見た目が「return x + y」になっていることをうっすら念頭におきつつ、厳密なところはあまり気にしないでください。

asyncio.run(foo())の存在と、main()にasyncが付いていることはひとまず気にしないでください。

ポイント、というか目を慣らして欲しいのは次の2点です。

  1. 擬似外部サードパーティの非同期関数という体のgaibuno_asyncfuncの戻り値は、元のコードでは、「x + y」を戻すようなものでも、コルーチンオブジェクトになっていること。
  2. コルーチンオブジェクトを格納した crという変数に対して、await式を適用すると、x + yらしき値の13が得られていること。また、非同期か並行だったかというところはともかく(この例だけでは実際ははっきりしませんが)awaitで待ち合わせしたようである。

というところです。

なお、「コルーチンオブジェクトが戻る」ということを見せたくてこのような例にしましたが、awaitで外部の非同期ライブラリをあえて同期的に実行したい場合は、実際は次のように、関数呼び出しの場所でawaitで捕まえるようなコーディングとしましょう。

async def main():
    result = await gaibuno_asyncfunc(3, 10)
    print(result)

(ex.2) 非同期処理をレスポンス向上に活かす際の例

前項では、非同期処理を同期的にする例としましたが、ここからが本題です。

あらためて気持ちを棚卸ししますが、準備が整ったら非同期処理は開始して欲しいし、複数の非同期処理があればこれらもそれぞれの内部で時間のかかる外部リソースのIO待ちになれば違いに切り替えながら他の仕事をして欲しいし、 並行して走っている複数の非同期処理のうち、ある処理については完了し次第、得られた値を使って次の演算をしたいよね、という非同期処理ならではのシーンをイメージします。

例えば、なんちゃって図で書くと、深く考えずに直列化すると(処理の流れは分かりやすいものの)下記図の上の図で登場人物分の処理時間を要してしまうものの、ホントの気持ちは下部の図のようにうまくやりたいよね、という例です。

f:id:azotar:20210113225344p:plain

このような用途については、ハイレベル、ローレベル合わせていくつか方法があるようです。

Python3.7あたりでは、ハイレベルのAPIが整理されており、次のような例がおおよそ、上記の図の下の方の図のような挙動になるコーディング例です。

■随時非同期処理をスタートさせて並行処理しながら必要なタイミングで戻り値を待ち合わせ取得して利用する例(hoge2.py)

import asyncio


async def some_asyncfunc(x):
    print(F'引数{x}で呼び出されました。ただし、すぐに完了するとは限りません。')
    return x


async def mainConcurrent():
    task1 = asyncio.create_task(some_asyncfunc(333))
    task2 = asyncio.create_task(some_asyncfunc(777))

    r1 = await task1
    print(F'r1:{r1}と一緒に出力したい')
    r2 = await task2
    print(F'r1:{r1},r2:{r2}と一緒に出力したい')

asyncio.run(mainConcurrent())

hoge2.pyの実行例

引数333で呼び出されました。ただし、すぐに完了するとは限りません。
引数777で呼び出されました。ただし、すぐに完了するとは限りません。
r1:333と一緒に出力したい
r1:333,r2:777と一緒に出力したい

ここでは、「タスク」という処理モデル(処理単位?)を抽象化したものがPythonの言語側で整理されており、これが登場します。

ポイントとしては、asyncio.create_task(コルーチンオブジェクト)という形で、「タスク」を作りなさいという命令をコールすると、その時点で引数で渡したコルーチンオブジェクトが取り扱う非同期処理がスケジュール化(言語側に任されることになりますが、つまり処理がスタート)されます。

ここで、asyncio.create_taskは、Taskオブジェクトを戻します。例によって、Taskオブジェクトはそのままでは、なんらかの値を見せてくれませんが、コルーチンオブジェクトやこの記事では紹介しないPythonのFeatureのような「awaitable」なオブジェクトであり、つまるところ、awaitで終了の待機と非同期処理で取得される値を得ることができます。

注:コルーチンを戻す関数を呼び出ししただけでは、その非同期処理はスタートしない(らしい)

実はex.1もそうでしたが、コルーチンを戻す関数を呼び出し記述しただけでは、その非同期処理はスタートしないことに注意してください。

言語に「スタートしても良いよ」ということを伝えるには、create_taskや今回はご紹介していないローレベルのイベントループ制御のAPIにコルーチンオブジェクトを渡してやる必要があります。

例えば、次のプログラム例では、

async def main():
    cr1 = some_asyncfunc(333)
    cr2 = some_asyncfunc(777)

    r1 = await cr1
    print(F'r1:{r1}と一緒に出力したい')
    r2 = await cr2
    print(F'r1:{r1},r2:{r2}と一緒に出力したい')

asyncio.run(main())

次のような出力となり、

引数333で呼び出されました。ただし、すぐに完了するとは限りません。
333 r1と一緒に出力したい
引数777で呼び出されました。ただし、すぐに完了するとは限りません。
333 r1と一緒に出力したい

awaitで「終了を待ち合わせする」記述が現れた時点から非同期処理が走行し始めて、完了するまで走行するという動きになります。

(ex.3) ある程度の(例えば同じような性質の複数のリソースへの情報取得の)非同期処理を複数を並行に走らせて、それらの取得値が出揃ったら次のステップの処理をしたい

ex.2は、どちらかといえば時間のかかる処理を並行で随時開始させておきたいというところに関心がありましたが、こちらは、並列で走らせたいがそれらの間に特に順序性等はないが、全部そろうのを待つという例です。

こんな↓シナリオ例です。

f:id:azotar:20210113225928p:plain

私なぞ深く考えないヒトは、この例について、普通(?)にforループでawaitしてまわれば良いのでは?とも思ったりするのですが、そうすると実際のところ直列化されてしまいます。

やはり、このような要件は頻出なのか、asyncio.gatherというAPIが用意されています。JavaScriptにおける、Promise.allですね。

また、これはそのまま定型として覚えても良いと思われますが、「await asyncio.gather(タスクオブジェクトの一覧)」としてawaitを付けて、結果のリストを取得するようにしています。

■(この例では並列(parallel)ではないが)パラパラと随時非同期処理を多数走行させて総取りする例(hoge3.py)

import asyncio


async def myfunc(a):
    """
    擬似非同期処理
    """    
    return a


async def main():
    tsks = [asyncio.create_task(myfunc(i)) for i in [0, 1, 2, 3, 4]]
    # 通常は、res = await asyncio.gather(*tsks) で良いが、戻り値の関係が分かりやすいように、
    # 以下では引数の順番で遊んでみている。
    res = await asyncio.gather(tsks[4], tsks[2], tsks[1], tsks[0], tsks[3])
    print(res)

asyncio.run(main())

hoge3.pyの実行結果

[4, 2, 1, 0, 3]

さいごに(【補足】async def、asyncio.run(foo())について)

最後にここまでの例ではあえて説明を飛ばした事項について補足など。

asyncio

ご紹介が最後になりましたが、asyncioさんです。

忘れずにimport してください。

docs.python.org

この記事は、上記リンク中の「高レベル API 」セクションの「コルーチンとTask」の内容にそっています。

ちなみに、記事のex.3の例をもう少し複雑にしたような例だと、「キュー」を使うことになるんだと思います。

async def

async def foo(xxx)で宣言した関数は、「foo(xxx)」で記述された際に、コルーチンオブジェクトを戻り値として戻すことで、非同期処理(そして実際は非同期でなくても)を抽象化できる上に、awaitで元々の戻り値を取得する分かりやすいコーディングができる... ものです。

...と私は(使う側の立場としては)理解しているのですが、このままの説明は公式リファレンスではされていません。PEPなどの読み込みやPythonへの深い理解が足らないのかも。

ただ、入門時は一旦このようにとらえておいて良いのではと感じています。

asyncio.run(foo())

asyncio.runは、コルーチンオブジェクトを引数にとって、その非同期処理を走行させるためのAPIです。

クライアント系のプログラムであれば、複数の非同期処理を取り扱うmain関数(これ自体もasync defで宣言することになり、プログラム上のreturn値によらず、main()という記述の戻り値はコルーチンになる)を定義し、これを実行プログラムの最後に呼び出すことになりますが、そこでasyncio.run(main())とすると、asyncioモジュール黎明期によく見られたイベントループを意識した低レベルのコーディングをしなくて済む... という意味で便利なAPIです。

ということで、Python 3.7以降で合流した方はこれをおまじないとしてまずは慣れることを優先した方が良いと思いますし、asyncio.runおまじないで十分な場合はこれ以上複雑なことはしないというのも立派な戦略かと思います。

https://docs.python.org/ja/3.8/library/asyncio-task.html#asyncio.run

*1:というほど難しい例は入れていませんが念のため。

*2:こいつ

docs.aiohttp.org

ですね。