【Python】生成したデータを並列で捌いていく
去年くらいに自分が書いたコードが大変読みにくいものだったので、並列処理再々入門しました。
作りたかった処理としては以下のようなものです。
- n~mまでのデータの処理を並列で行う
- リクエストボディで必要になるデータをQueueに入れる
- どこまで処理が完了したかログを出す
最終成果物はmain.pyのものです。後で思い出しやすいように、要素を細分化したサンプルコードを合わせて残しました。
- STEP1. multiprocessで並列処理を行う
- STEP2. Queueを使ってプロセス間でデータを共有する
- STEP3. QueueHandlerを使って複数プロセスから単一ファイルへログを記録する
- まとめ. multiprocess, Queue, QueueHandlerを使った並列処理
- 参考
STEP1. multiprocessで並列処理を行う
組み込みモジュールmultiprocessingを使って行うときの書き方です。
- Process()で子プロセスのインスタンスを生成する。その際に引数に処理させる関数とその関数に渡す引数をTupleで渡す
- ↑のインスタンスをstart()で処理開始
- join()は子プロセスが終了するまでメインの処理を一時停止させる
STEP2. Queueを使ってプロセス間でデータを共有する
multiprocessingモジュールではプロセス間通信の手段が二つ用意されています。
- Queue
- Pipe:ソケットのような双方向通信チャネル
今回は双方向にする必要はないのでQueueを用いて書きました。
def worker
: 子プロセスが行う処理を定義。キューから取り出したアイテムで計算処理を行うdef calculate
: 子プロセスが計算を行うときに呼び出すメソッドdef mul
: calculateに渡せる関数の一つ。名前の通り引数で渡された数を掛け算する
def main
:- タスク(今回の場合は「
(mul, (i, 2))
」形式のもの)を生成し、キューに詰む - キューを作成したり、子プロセスを生成したり
- 最後にqueueに”STOP”をputすることで、子プロセスが処理を終えるようになっている
- タスク(今回の場合は「
STEP3. QueueHandlerを使って複数プロセスから単一ファイルへログを記録する
loggingハンドラーのQueueHandlerを使って書きました。
def listener_configurer
: RotatingFileHandlerの設定def listener_process
: ログ用キューに積まれたログを取り出し、ファイルに出力していく。Noneを受け取るまで永遠に動くdef worker_configurer
: workerで使うQueueHandlerの設定def worker_process
: 子プロセスの処理。今回はランダムにログメッセージを生成しログを吐いているdef main
: 全体と同じようにQueueを生成し、 子プロセスを生成し、最後にqueueにNoneを渡して処理を終わらせるようにしている
まとめ. multiprocess, Queue, QueueHandlerを使った並列処理
def main
:- ログ用のキューとworkerで処理するデータを詰むキューの二つを作成し、workerに渡す。
- 子プロセスを起動した後にキューに生成したデータを積んでいき(
def generate_data
)、全て終わったらNoneを渡す
def worker_process
: lister_processと同じようにキューのタスクを待ち続けるworkerに。def listener_configurer
、def listener_process
、def worker_configurer
: step3のコードを踏襲したもの
join()の位置を間違えて「処理が終わったのにメインプロセスが終了しない」「キューの中身を捌き切る前に子プロセスが処理する」みたいなところで少しハマってしまいました...。