python threading for I/O-bound processing
pythonのthreadingは,GILの影響でCPU-boundな処理はserialと変わらない時間で実行されるが,I/O-boundな処理はそうでもないと聞いたことがある.本当にそうなのかと思って,MB単位のzip filesを解凍するというI/O-boundなタスクで,I/O-boundなタスクがthreadingによって早くなるかどうか調べてみた.さらにRubyのThreadとも比較する.
検証条件
- OS: Ubuntu 12.04
- CPU: Intel(R) Core(TM) i5-2540M CPU @ 2.60GHz
- Python: 2.7.3
- Ruby: 2.0.0p247
- ファイル数: 182個
- 総サイズ: 2.32 GB
- ファイルサイズ平均: 12.74 MB
- ファイルサイズ中央値: 9.45 MB
- Thread数: 3
- disk cacheで早くなるのを避けるためスクリプト実行ごとに,
$ echo 1 > /proc/sys/vm/drop_caches
python code
- serial
#!/usr/bin/python # -*- coding: utf-8 -*- import threading import glob import time import zipfile path = "/home/kzk/downloads/*.zip" zipfiles = glob.glob(path) # reading onley unzip def unzip(zfin): zf = zipfile.ZipFile(zfin, "r") for f in zf.namelist(): # zipは複数ファイルが1つにまとめられている前提のため #print "unzip", f zf.read(f) # reading only zf.close # non-thread for comparison st = time.time() for zf in zipfiles: unzip(zf) et = time.time() print "total execution time without threading: ", (et - st), "[s]"
- thread
#!/usr/bin/python # -*- coding: utf-8 -*- import threading import glob import time import zipfile import Queue # http://docs.python.jp/2/library/queue.html#module-Queue # read onley unzip def unzip(zfin): zf = zipfile.ZipFile(zfin, "r") for f in zf.namelist(): # zipは複数ファイルが1つにまとめられている前提のため #print "unzip", f zf.read(f) # reading only zf.close pass # queue ## targetにinstanceを渡せない ## 渡したいなら,Threadクラスを継承したクラスを作る queue = Queue.Queue() # wokrer def worker(): """ """ while True: path = queue.get() unzip(path) queue.task_done() pass pass # put task path = "/home/kzk/downloads/*.zip" for path in glob.glob(path): queue.put(path) pass # create/start thread st = time.time() num_worker_threads = 3 for i in range(num_worker_threads): t = threading.Thread(target=worker) t.daemon = True t.start() pass queue.join() # wait et = time.time() print "total execution time with threading: ", (et - st), "[s]" # total execution time with threading: 60.7147810459 [s]
ruby code
- serial
#!/usr/bin/ruby # -*- coding: utf-8 -*- require "rubygems" require "zipruby" # unzip function def unzip(path) Zip::Archive.open(path) do |archives| archives.each do |a| unless a.directory? ## reading only a.read end end end end # non-thread for comparison path = "/home/kzk/downloads/*.zip" st = Time.now for fpath in Dir.glob(path) unzip(fpath) end et = Time.now puts "total execution time without threading: #{et - st} [s]"
- thread
#!/usr/bin/ruby # -*- coding: utf-8 -*- require "rubygems" require "zipruby" require "thread" # unzip function def unzip(path) Zip::Archive.open(path) do |archives| archives.each do |a| unless a.directory? # reading only a.read end end end end # queue queue = Queue.new # push task path = "/home/kzk/downloads/*.zip" Dir.glob(path).each do |p| queue.push(p) end # create/start therad num_threads = 3 st = Time.now (0..(num_threads - 1)).each do |e| t = Thread.new() do loop do p = queue.pop unzip(p) end end end # queue empty while true if queue.empty? break end end et = Time.now puts "total execution time with threading: #{et - st} [s]"