python multiprocessing
Python multiprocessingでWokerPoolを作る.
multiprocessing.Poolはタスクの実行にグローバル関数しか渡せないの不便だと思う.
- WorkerPool
- Worker
- Task
の3クラスを作ってJava ConcurrentのExecuterServiceのようにマルチプロセッシングしたい.
そうしたい時の設計方針と例.
WorkerPool
Worker
- Processクラスを継承
- Task QueueとResult Ququeを持つ
- run methodをオーバーライトして,TaskをTaskQueueから取り出して実行
Task
- Java ConcurrentのCallableのイメージ
- Workerに実行される
Tips
pythonでmulticoreの恩恵を受けるには,multiprocessingを使う.
プロセス間で大きなデータを受け渡すのは非効率なので,データへのパスを渡すようにして,各Workerでデータをメモリに読み込み,何かしらのタスクをしたほうがいいと思われる.
サンプルコード.
Worker Pool
from multiprocessing import Process from multiprocessing import Queue import multiprocessing class WorkerPool(): """ Worker Pool """ NUM_WORKERS = multiprocessing.cpu_count() def __init__(self, task_queue=None, result_queue=None, num_workers=NUM_WORKERS): """ Arguments: - `num_worker`: task queue - `task_queue`: task queue - `result_queue`: result queue """ self._task_queue = task_queue if task_queue is not None else Queue() self._result_queue = result_queue if result_queue is not None else Queue() self._num_workers = num_workers self._workers = [] self._start_workers() pass def put(self, task): """ Put task to worker Arguments: - `task`: """ self._task_queue.put(task) pass def get(self, ): """ Get result from queue """ return self._result_queue.get() def _start_workers(self, ): """ Append self.num_workersworker to _workers """ for i in xrange(0, self._num_workers): worker = Worker(self._task_queue, self._result_queue) self._workers.append(worker) worker.start() pass pass def terminate(self, ): """ Terminate workers """ for i in xrange(0, self._num_workers): self._workers[i].terminate() pass self._result_queue.close() self._task_queue.close() pass
Worker
class Worker(Process): """ Worker process. """ def __init__(self, task_queue, result_queue): """ Arguments: - `task_queue`: - `result_queue`: """ super(Worker, self).__init__() self._task_queue = task_queue self._result_queue = result_queue pass def run(self, ): """ """ while True: task = self._task_queue.get() # blocked here ret = task.do_task() self._result_queue.put(ret) pass pass pass
Task
class Task(object): """ Task """ def __init__(self, task_name=""): """ """ self.task_name = task_name pass def do_task(self, ): """ """ return self.task_name + " is finished."
main
def main(): # worker pool pool = WorkerPool(num_workers=4) # put tasks print "put tasks" num_task = 25 for i in xrange(0, num_task): pool.put(Task(task_name="task %d" % i)) pass # get results print "get results" for i in xrange(0, num_task): print pool.get() pass pool.terminate() if __name__ == '__main__': main()