のんびりしているエンジニアの日記

ソフトウェアなどのエンジニア的な何かを書きます。

Pythonで並列分散処理(multiprocess)

Sponsored Links

皆さんこんにちは
お元気ですか。私は元気です。

さて、今日はPythonで並列分散処理を行ってみましょう
今回使うライブラリはmultiprocessです。

まずは、引数を与えて出力をしてみましょう。

最も単純な例

#coding:utf-8
import multiprocessing
import os
def f(number):
    print number, os.getppid(),os.getpid()

if __name__ == '__main__':
	jobs = []
	for i in xrange(100):
		p = multiprocessing.Process(target=f, args=(i,))
		jobs.append(p)
		p.start()

	for job in jobs:
		job.join()

joinをすることにより待機することができます。
このプログラムは全てのプロセスが終了するまで出れません。
以下の出力は与えた番号、生成プロセス、実行プロセスです。

0 40627 40628
1 40627 40629
2 40627 40630
3 40627 40631
4 40627 40632
5 40627 40633
6 40627 40634
7 40627 40635
8 40627 40636
9 40627 40637

次に、マルチプロセスで実行したプロセスからまとまった結果を取得する方法です。以下のような感じですね。

Mapでの結果取得

#coding:utf-8
import multiprocessing

def fuga(x):
	return x*x

def hoge():
    p = multiprocessing.Pool(4)
    print p.map(fuga, range(10))

if __name__ == "__main__":
    hoge()

出力はリストで返って来ます。

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

Mapに複数の引数を渡すとき

最後に複数の引数を渡すとき、実はmapでは、一つしか渡せません
tupleを使って、一つに見せてwrapper関数に渡して、それを実行したい関数に引数として渡します。
以下の例だと、wrapper_fugaがwrapper関数でその中でfugaを実行しています。

#coding:utf-8
import multiprocessing

def fuga(x,y):
	return x*y

def hoge():
    p = multiprocessing.Pool(4)
    data = [(fuga,i+2,i) for i in xrange(10)]
    print p.map(wrapper_fuga, data)

def wrapper_fuga(tuple_data):
	#print tuple_data
	return tuple_data[0](tuple_data[1],tuple_data[2])

if __name__ == "__main__":
    hoge()

こんな感じで並列処理を実行することができます。
長いプログラムを実行するときにうまく利用できるといいですね!