R, Python, DB 備忘録

データベースとか、jupyter(Python)、Rとか色々

Python+jupyterの並列処理

つい最近Pythonの並列処理について書いたばかりなんですが
rpy.hatenablog.com

jupyter上で並列処理ができるipyparallelを使ってみたので、導入から簡単な使い方までまとめました。

  • ipyparallelは、単一マシン上のマルチプロセスだけでなく、分散コンピューティングにも対応(むしろそっちがメイン?)

※ただ、結論から言うとナシかなあと。あまりお手軽でもないし、前回書いた方法と比較しても、jupyterで動く以上のメリットを感じませんでした。
※※magic commandなどにも慣れていけばまた違うのかも。

前提

仮想環境myenvで動かすプログラムについて、並列化したい。

インストール

仮想環境にipyparallelをインストール

(myenv-*****)>pipenv install ipyparallel

並列処理エンジンの稼働

仮想環境でipcontrolleripengine

(myenv-*****)>ipcontroller
2020-07-15 12:37:06.256 [IPControllerApp] Hub listening on tcp://127.0.0.1:54471 for registration.
2020-07-15 12:37:06.306 [IPControllerApp] Hub using DB backend: 'DictDB'
2020-07-15 12:37:06.564 [IPControllerApp] hub::created hub
2020-07-15 12:37:06.565 [IPControllerApp] writing connection info to C:\Users\~~~~~\.ipython\profile_default\security\ipcontroller-client.json
2020-07-15 12:37:06.568 [IPControllerApp] writing connection info to C:\Users\~~~~~\.ipython\profile_default\security\ipcontroller-engine.json
2020-07-15 12:37:06.572 [IPControllerApp] task::using Python leastload Task scheduler
2020-07-15 12:37:06.573 [IPControllerApp] Heartmonitor started
2020-07-15 12:37:06.647 [IPControllerApp] Creating pid file: C:\Users\~~~~~\.ipython\profile_default\pid\ipcontroller.pid
2020-07-15 12:37:09.210 [scheduler] Scheduler started [leastload]
2020-07-15 12:37:09.212 [IPControllerApp] client::client b'\x00\x80\x00\x00)' requested 'connection_request'
2020-07-15 12:37:09.213 [IPControllerApp] client::client [b'\x00\x80\x00\x00)'] connected
2020-07-15 12:38:33.386 [IPControllerApp] client::client b'00a55305-095b357100421a90794cf1d4' requested 'registration_request'
2020-07-15 12:38:36.578 [IPControllerApp] registration::finished registering engine 0:00a55305-095b357100421a90794cf1d4

ipengineは、稼働させる数だけ別窓を開いて実行

(myenv-*****)>ipengine
2020-07-15 14:21:26.901 [IPEngineApp] Loading url_file 'C:\\Users\\~~~~~\\.ipython\\profile_default\\security\\ipcontroller-engine.json'
2020-07-15 14:21:26.911 [IPEngineApp] Registering with controller at tcp://127.0.0.1:54471
2020-07-15 14:21:27.193 [IPEngineApp] Starting to monitor the heartbeat signal from the hub every 3010 ms.
2020-07-15 14:21:27.228 [IPEngineApp] Completed registration with id 0

Pythonプログラム側

インポート

>>> import ipyparallel as ipp

ipengineの稼働を確認

ipengineを実行したときの[IPEngineApp] Completed registration with id 0で登録されたidが表示される。

>>> c = ipp.Client()
>>> print(c.ids)
[0, 1, 2]

並列処理する関数の作成

irisデータを使って、Speciesの'setosa' 'versicolor' 'virginica'ごとにYes/Noを判定するロジスティック回帰モデルを作ってみる。

  • 準備
import pandas as pd
import numpy as np
from sklearn.datasets import load_iris
from sklearn.linear_model import LogisticRegressionCV
from sklearn.metrics import roc_curve
iris = load_iris()
  • 判定対象のSpecies(0: 'setosa' 1: 'versicolor' 2: 'virginica')をインプットとしてモデルを作成する関数
def model_create(tgt):
    clf = LogisticRegressionCV(penalty='l2', solver='liblinear', scoring='neg_log_loss', Cs=10, cv=3)
    y = iris.target==tgt
    clf.fit(X=iris.data, y=y)
    clf.score(iris.data, y)
    y_pred = clf.predict_proba(iris.data)[:, 1]
    roc = roc_curve(y, y_pred)
    clf.set_params(scoring='roc_auc')
    auc_score = clf.score(iris.data, y)
    
    return({'target': tgt, 'auc_score': auc_score}

普通にforループ

>>> for i in range(3):
...    print(model_create(i))
{'target': 0, 'auc_score': 1.0}
{'target': 1, 'auc_score': 0.827}
{'target': 2, 'auc_score': 0.9984}

並列化

コード
rc = ipp.Client()
dview = rc[:]     # 各engineへの直接的な操作を行う DirectView
dview.push({'iris': iris} )
dview['LogisticRegressionCV'] = LogisticRegressionCV
dview.execute("from sklearn.metrics import roc_curve")
# dview.execute("import numpy as np")
# dview.execute("import pandas as pd")

lv = rc.load_balanced_view()
rs = lv.map_async(model_create, [0,1,2])
rs.wait_interactive()
出力結果

並列処理の各結果をリストとして取得できる。

>>> rs.result()
[{'target': 0, 'auc_score': 1.0},
 {'target': 1, 'auc_score': 0.827},
 {'target': 2, 'auc_score': 0.9984}]
ポイント
  • 各engineに必要なオブジェクトを移すためにはDirectViewpushメソッドを使用する。
    • しかし、大きいデータフレームをこの方法で各engineに送ったとき、そのデータフレームは使用できなかった(「そんなデータフレームはない」というエラーが出た)。
    • なので、前回記事でも書いたが、大きいデータは並列処理させる関数の中で作ったほうがよい。
dview.push({'iris': iris} )
  • dict操作でもオブジェクトを移すことが可能
dview['iris'] = iris
  • ただし、classはdict操作でのみ移すことが可能
dview['LogisticRegressionCV'] = LogisticRegressionCV
  • インポートはexecuteメソッドを使用する。
dview.execute("from sklearn.metrics import roc_curve")
  • 関数をpushメソッド等で移した場合、エラーが発生することがある。
    • roc_curveの場合、下記の通り、キーワード引数のデフォルト値を認識しない。
>>> dview['roc_curve'] = roc_curve
...
TypeError: roc_curve() missing 3 required keyword-only arguments: 'pos_label', 'sample_weight', and 'drop_intermediate'
  • 並列タスク進捗状況を表示
>>> rs.wait_interactive()
1/3 tasks finished after 159 s     # タスク進捗状況