KZKY memo

自分用メモ.

python multiprocessing

Python multiprocessingでWokerPoolを作る.

multiprocessing.Poolはタスクの実行にグローバル関数しか渡せないの不便だと思う.

  • WorkerPool
  • Worker
  • Task

の3クラスを作ってJava ConcurrentのExecuterServiceのようにマルチプロセッシングしたい.
そうしたい時の設計方針と例.

WorkerPool

  • Workersをつくる
  • Task QueueとResult Ququeを持つ
  • このクラスのインスタンス経由でTaskをWokrerに渡す
  • Taskが終わったらこのクラスのインスタンス経由で処理の完了通知を受け取る

Worker

  • Processクラスを継承
  • Task QueueとResult Ququeを持つ
  • run methodをオーバーライトして,TaskをTaskQueueから取り出して実行

Task

  • Java ConcurrentのCallableのイメージ
  • Workerに実行される

Tips

pythonでmulticoreの恩恵を受けるには,multiprocessingを使う.
プロセス間で大きなデータを受け渡すのは非効率なので,データへのパスを渡すようにして,各Workerでデータをメモリに読み込み,何かしらのタスクをしたほうがいいと思われる.

サンプルコード.

Worker Pool

from multiprocessing import Process
from multiprocessing import Queue

import multiprocessing
class WorkerPool():
    """
    Worker Pool
    """
    NUM_WORKERS = multiprocessing.cpu_count()
    
    def __init__(self, task_queue=None, result_queue=None, num_workers=NUM_WORKERS):
        """
        
        Arguments:
        - `num_worker`: task queue
        - `task_queue`: task queue
        - `result_queue`: result queue
        """

        self._task_queue = task_queue if task_queue is not None else Queue()
        self._result_queue = result_queue if result_queue is not None else Queue()

        self._num_workers = num_workers
        self._workers = []
        
        self._start_workers()
        
        pass

    def put(self, task):
        """
        Put task to worker
        Arguments:
        - `task`:
        """

        self._task_queue.put(task)
        
        pass

    def get(self, ):
        """
        Get result from queue
        """

        return self._result_queue.get()

    def _start_workers(self, ):
        """
        Append self.num_workersworker to _workers
        """

        for i in xrange(0, self._num_workers):
            worker = Worker(self._task_queue, self._result_queue)
            self._workers.append(worker)
            worker.start()
            pass
            
        pass

    def terminate(self, ):
        """
        Terminate workers
        """
        for i in xrange(0, self._num_workers):
            self._workers[i].terminate()
            pass

        self._result_queue.close()
        self._task_queue.close()
        pass

Worker

class Worker(Process):
    """
    Worker process.
    """
    
    def __init__(self, task_queue, result_queue):
        """
        
        Arguments:
        - `task_queue`:
        - `result_queue`:
        """
        super(Worker, self).__init__()

        self._task_queue = task_queue
        self._result_queue = result_queue
        
        pass

    def run(self, ):
        """
        
        """

        while True:
            task = self._task_queue.get()  # blocked here
            ret = task.do_task()
            self._result_queue.put(ret)
            
            pass
            
        pass
        pass

Task

class Task(object):
    """
    Task
    """
    
    def __init__(self, task_name=""):
        """
        """
        self.task_name = task_name
        pass

    def do_task(self, ):
        """
        """
        
        return self.task_name + " is finished."

main

def main():
    # worker pool
    pool = WorkerPool(num_workers=4)

    # put tasks
    print "put tasks"
    num_task = 25
    for i in xrange(0, num_task):
        pool.put(Task(task_name="task %d" % i))
        pass

    # get results
    print "get results"
    for i in xrange(0, num_task):
        print pool.get()
        pass

    pool.terminate()

if __name__ == '__main__':
    main()