Python+jupyterの並列処理
つい最近Pythonの並列処理について書いたばかりなんですが
rpy.hatenablog.com
jupyter上で並列処理ができるipyparallel
を使ってみたので、導入から簡単な使い方までまとめました。
ipyparallel
は、単一マシン上のマルチプロセスだけでなく、分散コンピューティングにも対応(むしろそっちがメイン?)
※ただ、結論から言うとナシかなあと。あまりお手軽でもないし、前回書いた方法と比較しても、jupyterで動く以上のメリットを感じませんでした。
※※magic commandなどにも慣れていけばまた違うのかも。
前提
仮想環境myenv
で動かすプログラムについて、並列化したい。
インストール
仮想環境にipyparallel
をインストール
(myenv-*****)>pipenv install ipyparallel
並列処理エンジンの稼働
仮想環境でipcontroller
とipengine
(myenv-*****)>ipcontroller 2020-07-15 12:37:06.256 [IPControllerApp] Hub listening on tcp://127.0.0.1:54471 for registration. 2020-07-15 12:37:06.306 [IPControllerApp] Hub using DB backend: 'DictDB' 2020-07-15 12:37:06.564 [IPControllerApp] hub::created hub 2020-07-15 12:37:06.565 [IPControllerApp] writing connection info to C:\Users\~~~~~\.ipython\profile_default\security\ipcontroller-client.json 2020-07-15 12:37:06.568 [IPControllerApp] writing connection info to C:\Users\~~~~~\.ipython\profile_default\security\ipcontroller-engine.json 2020-07-15 12:37:06.572 [IPControllerApp] task::using Python leastload Task scheduler 2020-07-15 12:37:06.573 [IPControllerApp] Heartmonitor started 2020-07-15 12:37:06.647 [IPControllerApp] Creating pid file: C:\Users\~~~~~\.ipython\profile_default\pid\ipcontroller.pid 2020-07-15 12:37:09.210 [scheduler] Scheduler started [leastload] 2020-07-15 12:37:09.212 [IPControllerApp] client::client b'\x00\x80\x00\x00)' requested 'connection_request' 2020-07-15 12:37:09.213 [IPControllerApp] client::client [b'\x00\x80\x00\x00)'] connected 2020-07-15 12:38:33.386 [IPControllerApp] client::client b'00a55305-095b357100421a90794cf1d4' requested 'registration_request' 2020-07-15 12:38:36.578 [IPControllerApp] registration::finished registering engine 0:00a55305-095b357100421a90794cf1d4
ipengine
は、稼働させる数だけ別窓を開いて実行
(myenv-*****)>ipengine 2020-07-15 14:21:26.901 [IPEngineApp] Loading url_file 'C:\\Users\\~~~~~\\.ipython\\profile_default\\security\\ipcontroller-engine.json' 2020-07-15 14:21:26.911 [IPEngineApp] Registering with controller at tcp://127.0.0.1:54471 2020-07-15 14:21:27.193 [IPEngineApp] Starting to monitor the heartbeat signal from the hub every 3010 ms. 2020-07-15 14:21:27.228 [IPEngineApp] Completed registration with id 0
Pythonプログラム側
インポート
>>> import ipyparallel as ipp
ipengineの稼働を確認
ipengineを実行したときの[IPEngineApp] Completed registration with id 0
で登録されたidが表示される。
>>> c = ipp.Client() >>> print(c.ids) [0, 1, 2]
並列処理する関数の作成
irisデータを使って、Speciesの'setosa' 'versicolor' 'virginica'ごとにYes/Noを判定するロジスティック回帰モデルを作ってみる。
- 準備
import pandas as pd import numpy as np from sklearn.datasets import load_iris from sklearn.linear_model import LogisticRegressionCV from sklearn.metrics import roc_curve iris = load_iris()
- 判定対象のSpecies(0: 'setosa' 1: 'versicolor' 2: 'virginica')をインプットとしてモデルを作成する関数
def model_create(tgt): clf = LogisticRegressionCV(penalty='l2', solver='liblinear', scoring='neg_log_loss', Cs=10, cv=3) y = iris.target==tgt clf.fit(X=iris.data, y=y) clf.score(iris.data, y) y_pred = clf.predict_proba(iris.data)[:, 1] roc = roc_curve(y, y_pred) clf.set_params(scoring='roc_auc') auc_score = clf.score(iris.data, y) return({'target': tgt, 'auc_score': auc_score}
普通にforループ
>>> for i in range(3): ... print(model_create(i)) {'target': 0, 'auc_score': 1.0} {'target': 1, 'auc_score': 0.827} {'target': 2, 'auc_score': 0.9984}
並列化
コード
rc = ipp.Client() dview = rc[:] # 各engineへの直接的な操作を行う DirectView dview.push({'iris': iris} ) dview['LogisticRegressionCV'] = LogisticRegressionCV dview.execute("from sklearn.metrics import roc_curve") # dview.execute("import numpy as np") # dview.execute("import pandas as pd") lv = rc.load_balanced_view() rs = lv.map_async(model_create, [0,1,2]) rs.wait_interactive()
出力結果
並列処理の各結果をリストとして取得できる。
>>> rs.result() [{'target': 0, 'auc_score': 1.0}, {'target': 1, 'auc_score': 0.827}, {'target': 2, 'auc_score': 0.9984}]
ポイント
- 各engineに必要なオブジェクトを移すためには
DirectView
のpush
メソッドを使用する。- しかし、大きいデータフレームをこの方法で各engineに送ったとき、そのデータフレームは使用できなかった(「そんなデータフレームはない」というエラーが出た)。
- なので、前回記事でも書いたが、大きいデータは並列処理させる関数の中で作ったほうがよい。
dview.push({'iris': iris} )
dict
操作でもオブジェクトを移すことが可能
dview['iris'] = iris
- ただし、
class
はdict操作でのみ移すことが可能
dview['LogisticRegressionCV'] = LogisticRegressionCV
- インポートは
execute
メソッドを使用する。
dview.execute("from sklearn.metrics import roc_curve")
- 関数を
push
メソッド等で移した場合、エラーが発生することがある。roc_curve
の場合、下記の通り、キーワード引数のデフォルト値を認識しない。
>>> dview['roc_curve'] = roc_curve ... TypeError: roc_curve() missing 3 required keyword-only arguments: 'pos_label', 'sample_weight', and 'drop_intermediate'
- 並列タスク進捗状況を表示
>>> rs.wait_interactive() 1/3 tasks finished after 159 s # タスク進捗状況