R, Python, DB 備忘録

データベースとか、jupyter(Python)、Rとか色々

Pythonの並列処理(マルチプロセス)

Rで並列処理といえばforeach
並列じゃない場合の繰り返しにも使いやすい上に気軽に並列化できる、神ライブラリと言って良いと思う。

(2020.12.18追記)最近はRayという素晴らしいライブラリができて非常に簡単に分散処理ができるようになっていたのですね。
docs.ray.io

なので、これ以降は参考記事です。


そんな並列化がお手軽なRに比べると、Pythonの並列化はなんだか色々難しく敷居が高かった。

  • そもそも並列処理と並行処理(マルチスレッド)という似たものがある
    • でもマルチスレッドは、GILという排他ロックによってCPUを使う処理はあまり速くならないとか
  • concurrent.futuresというのが便利らしいがjupyterでは動かない
    • 正確には、並列化したい部分を .py にして外部読込が必要

しかし、いよいよ並列化をしないとおっつかない仕事ができてしまったので、concurrent.futuresと格闘しました。

どういう処理?

SPMD(Single Program Multiple Data)というタイプだと思いますが、要はXは同じでyを色々変えてモデルをいっぱい作るというもの。

コードと解説

並列処理される関数_modeling

def _modeling(myvar):
    print(f'{myvar} started at : ' + str(datetime.now()))
    # Load X
    X = load_npz('X.npz')
    # DataBase info
    cnxn = pyodbc.connect('DSN=*****')
    dfy = pd.read_sql_query( f""" Select ... """, cnxn)

    # Modeling
    model = MyClass(dfy, X, myvar)
    model.main()
    
    model.coef.to_csv(f'coef_{myvar}.csv')
    return({'myvar': model.myvar, 'auc_train': model.auc_train, 'auc_test': model.auc_test, 'val': model.val})
ポイント
  • 関数_modelingは複数の引数を取れるが、大きいデータフレームの作成などは関数内に埋め込むべき
    • 引数として渡したオブジェクトはプロセス間通信でやり取りされる。
    • やり取りできるものはpickle化できるオブジェクトに限られる。
    • 疎行列などは、pickle化できるオブジェクトに変換していると思う(ドキュメントを読んでないが、メモリが爆死したので恐らくそう)
    • 安全のためにも、全体で共通のデータフレームや疎行列であっても、いったんファイル(.csv, .npz)に保存して並列処理する関数の中で改めて読み込むほうがよさそう
  • 同じ理由で、返り値もあまり重いものにしないほうが良い。(上の例も返り値は辞書)

プログラム本体

def main():
    ...
    save_npz(X, 'X.npz')
    ...
    with futures.ProcessPoolExecutor() as executor:
        jobs = {}
        MAX_JOBS_IN_QUEUE = 20
        vars_left = len(myvar_list)
        vars_iter = iter(myvar_list)
        
        result_df = pd.DataFrame()
        while vars_left:
            for this_var in vars_iter:
                job = executor.submit(_modeling, this_var)
                # 並列処理する対象とそれに対応する値 (処理の引数) を辞書で用意する
                jobs[job] = this_var
                # 処理上限数を超えた場合一旦止める
                if len(jobs) > MAX_JOBS_IN_QUEUE:
                    break
            # futures.as_completed() は処理が済んだものから結果を返すジェネレータ
            for future in futures.as_completed(jobs):
                vars_left -= 1
                # 処理結果を取得する
                result = future.result()
                this_var = jobs[future]
                
                # 結果(future)の削除
                del jobs[future]
                
                # 結果を表示する
                print(f'{this_var} finished at : ' + str(datetime.now()))
                result_df = result_df.append(result, ignore_index=True)
                break
ポイント
  • 最初のforループfor this_var in vars_iter:では処理上限数MAX_JOBS_IN_QUEUEまで処理jobを作成
    • 際限なく作成するとメモリ不足を起こす
  • 次のforループfor future in futures.as_completed(jobs):では処理が済んだものから後続処理を進める
    • del jobs[future]で済んだ処理を消し、メモリを確保
    • breakを入れているので、基本逐次処理になる
  • vars_left0でなければ処理対象(vars_iter)が残っているのでwhileループに戻る。
    • vars_itermyvar_list由来のイテレータなので、while2回目以降のループであっても続き(myvar_listの残)から処理ができる

プログラム実行

コードスクリプトmyprogram.pyとして

(myenv-*****) > python myprogram.py     # 対象の仮想環境を有効化しておくことを忘れずに。