Pythonの並列処理(マルチプロセス)
Rで並列処理といえばforeach
並列じゃない場合の繰り返しにも使いやすい上に気軽に並列化できる、神ライブラリと言って良いと思う。
(2020.12.18追記)最近はRay
という素晴らしいライブラリができて非常に簡単に分散処理ができるようになっていたのですね。
docs.ray.io
なので、これ以降は参考記事です。
そんな並列化がお手軽なRに比べると、Pythonの並列化はなんだか色々難しく敷居が高かった。
- そもそも並列処理と並行処理(マルチスレッド)という似たものがある
- でもマルチスレッドは、GILという排他ロックによってCPUを使う処理はあまり速くならないとか
concurrent.futures
というのが便利らしいがjupyterでは動かない- 正確には、並列化したい部分を .py にして外部読込が必要
しかし、いよいよ並列化をしないとおっつかない仕事ができてしまったので、concurrent.futures
と格闘しました。
どういう処理?
SPMD(Single Program Multiple Data)というタイプだと思いますが、要はX
は同じでy
を色々変えてモデルをいっぱい作るというもの。
コードと解説
並列処理される関数_modeling
def _modeling(myvar): print(f'{myvar} started at : ' + str(datetime.now())) # Load X X = load_npz('X.npz') # DataBase info cnxn = pyodbc.connect('DSN=*****') dfy = pd.read_sql_query( f""" Select ... """, cnxn) # Modeling model = MyClass(dfy, X, myvar) model.main() model.coef.to_csv(f'coef_{myvar}.csv') return({'myvar': model.myvar, 'auc_train': model.auc_train, 'auc_test': model.auc_test, 'val': model.val})
ポイント
- 関数
_modeling
は複数の引数を取れるが、大きいデータフレームの作成などは関数内に埋め込むべき- 引数として渡したオブジェクトはプロセス間通信でやり取りされる。
- やり取りできるものはpickle化できるオブジェクトに限られる。
- 疎行列などは、pickle化できるオブジェクトに変換していると思う(ドキュメントを読んでないが、メモリが爆死したので恐らくそう)
- 安全のためにも、全体で共通のデータフレームや疎行列であっても、いったんファイル(.csv, .npz)に保存して並列処理する関数の中で改めて読み込むほうがよさそう
- 同じ理由で、返り値もあまり重いものにしないほうが良い。(上の例も返り値は辞書)
プログラム本体
def main(): ... save_npz(X, 'X.npz') ... with futures.ProcessPoolExecutor() as executor: jobs = {} MAX_JOBS_IN_QUEUE = 20 vars_left = len(myvar_list) vars_iter = iter(myvar_list) result_df = pd.DataFrame() while vars_left: for this_var in vars_iter: job = executor.submit(_modeling, this_var) # 並列処理する対象とそれに対応する値 (処理の引数) を辞書で用意する jobs[job] = this_var # 処理上限数を超えた場合一旦止める if len(jobs) > MAX_JOBS_IN_QUEUE: break # futures.as_completed() は処理が済んだものから結果を返すジェネレータ for future in futures.as_completed(jobs): vars_left -= 1 # 処理結果を取得する result = future.result() this_var = jobs[future] # 結果(future)の削除 del jobs[future] # 結果を表示する print(f'{this_var} finished at : ' + str(datetime.now())) result_df = result_df.append(result, ignore_index=True) break
ポイント
- 最初のforループ
for this_var in vars_iter:
では処理上限数MAX_JOBS_IN_QUEUE
まで処理job
を作成- 際限なく作成するとメモリ不足を起こす
- 次のforループ
for future in futures.as_completed(jobs):
では処理が済んだものから後続処理を進めるdel jobs[future]
で済んだ処理を消し、メモリを確保break
を入れているので、基本逐次処理になる
vars_left
が0
でなければ処理対象(vars_iter
)が残っているのでwhileループに戻る。vars_iter
はmyvar_list
由来のイテレータなので、while2回目以降のループであっても続き(myvar_list
の残)から処理ができる