日々のあれこれφ(..)

もっぱら壁打ち

【Python】生成したデータを並列で捌いていく

去年くらいに自分が書いたコードが大変読みにくいものだったので、並列処理再々入門しました。

作りたかった処理としては以下のようなものです。

  • n~mまでのデータの処理を並列で行う
    • 元々はHTTPリクエストすることを想定していました。このリポジトリでは汎用的にするためにprint出力に留めています。
  • リクエストボディで必要になるデータをQueueに入れる
  • どこまで処理が完了したかログを出す

最終成果物はmain.pyのものです。後で思い出しやすいように、要素を細分化したサンプルコードを合わせて残しました。

STEP1. multiprocessで並列処理を行う

組み込みモジュールmultiprocessingを使って行うときの書き方です。

main1.py

  • Process()で子プロセスのインスタンスを生成する。その際に引数に処理させる関数とその関数に渡す引数をTupleで渡す
  • ↑のインスタンスをstart()で処理開始
  • join()は子プロセスが終了するまでメインの処理を一時停止させる

STEP2. Queueを使ってプロセス間でデータを共有する

multiprocessingモジュールではプロセス間通信の手段が二つ用意されています。

  • Queue
  • Pipe:ソケットのような双方向通信チャネル

今回は双方向にする必要はないのでQueueを用いて書きました。

main2.py

  • def worker : 子プロセスが行う処理を定義。キューから取り出したアイテムで計算処理を行う
    • def calculate : 子プロセスが計算を行うときに呼び出すメソッド
    • def mul : calculateに渡せる関数の一つ。名前の通り引数で渡された数を掛け算する
  • def main :
    • タスク(今回の場合は「(mul, (i, 2))」形式のもの)を生成し、キューに詰む
    • キューを作成したり、子プロセスを生成したり
    • 最後にqueueに”STOP”をputすることで、子プロセスが処理を終えるようになっている

STEP3. QueueHandlerを使って複数プロセスから単一ファイルへログを記録する

loggingハンドラーのQueueHandlerを使って書きました。

main3.py

  • def listener_configurer: RotatingFileHandlerの設定
  • def listener_process : ログ用キューに積まれたログを取り出し、ファイルに出力していく。Noneを受け取るまで永遠に動く
  • def worker_configurer : workerで使うQueueHandlerの設定
  • def worker_process : 子プロセスの処理。今回はランダムにログメッセージを生成しログを吐いている
  • def main : 全体と同じようにQueueを生成し、 子プロセスを生成し、最後にqueueにNoneを渡して処理を終わらせるようにしている

まとめ. multiprocess, Queue, QueueHandlerを使った並列処理

main.py

  • def main :
    • ログ用のキューとworkerで処理するデータを詰むキューの二つを作成し、workerに渡す。
    • 子プロセスを起動した後にキューに生成したデータを積んでいき(def generate_data)、全て終わったらNoneを渡す
  • def worker_process : lister_processと同じようにキューのタスクを待ち続けるworkerに。
  • def listener_configurerdef listener_processdef worker_configurer: step3のコードを踏襲したもの

github.com

join()の位置を間違えて「処理が終わったのにメインプロセスが終了しない」「キューの中身を捌き切る前に子プロセスが処理する」みたいなところで少しハマってしまいました...。

参考