引出
首先需要了解的是threadpool 的用途,他更适合于用到一些大量的短任务合集,而非一些时间长的任务,换句话说,适合大量的CPU密集型短任务,那些消耗时间较长的IO密集型长任务适合用去解决。
目前,python 标准库(特指python2.X)中的threadpool模块是在 multiprocessing.pool.threadpool,或者multiprocessing.dummy.ThreadPool(dummy模块是针对threading 多线程的进一步封装)。该模块有个缺点就是在所有线程执行完之前无法强制退出。实现原理大同小异:实例化pool的时候会创建指定数目的线程,把task 传给一个task-queue,线程会读取task-queue 的task,没有就阻塞,读取到后就执行,并将结果交给一个result-queue。
除了标准库中的threadpool,还有一些使用比较多的threadpool,以下展开。
pip 中的 ThreadPool
安装简单:pip install threadpool
使用如下: 1 2 3 4 | pool = ThreadPool(poolsize) # 定义线程池,指定线程数量 requests = makeRequests(some_callable, list_of_args, callback) # 调用makeRequests创建了要开启多线程的函数,以及函数相关参数和回调函数 [pool.putRequest(req) for req in requests] # 所有要运行多线程的请求扔进线程池 pool.wait() # 等待所有线程完成后退出 |
原理类似,源码解读可以参考 ,该博客还给出了对其的一些优化。
自己定制 threadpool
根据需要的功能定制适合自己的threadpool 也是一种常见的手段,常用的功能比如:是否需要返回线程执行后的返回值,线程执行完之后销毁还是阻塞等等。以下为自己经常用的的一个比较简洁的threadpool,感谢@kaito-kidd提供,:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 | # coding: utf8 """ 线程池,用于高效执行某些任务。 """ import Queue import threading class Task(threading.Thread): """ 任务 """ def __init__(self, num, input_queue, output_queue, error_queue): super(Task, self).__init__() self.thread_name = "thread-%s" % num self.input_queue = input_queue self.output_queue = output_queue self.error_queue = error_queue self.deamon = True def run(self): """run """ while 1: try: func, args = self.input_queue.get(block=False) except Queue.Empty: print "%s finished!" % self.thread_name break try: result = func(*args) except Exception as exc: self.error_queue.put((func.func_name, args, str(exc))) else: self.output_queue.put(result) class Pool(object): """ 线程池 """ def __init__(self, size): self.input_queue = Queue.Queue() self.output_queue = Queue.Queue() self.error_queue = Queue.Queue() self.tasks = [ Task(i, self.input_queue, self.output_queue, self.error_queue) for i in range(size) ] def add_task(self, func, args): """添加单个任务 """ if not isinstance(args, tuple): raise TypeError("args must be tuple type!") self.input_queue.put((func, args)) def add_tasks(self, tasks): """批量添加任务 """ if not isinstance(tasks, list): raise TypeError("tasks must be list type!") for func, args in tasks: self.add_task(func, args) def get_results(self): """获取执行结果集 """ while not self.output_queue.empty(): print "Result: ", self.output_queue.get() def get_errors(self): """获取执行失败的结果集 """ while not self.error_queue.empty(): func, args, error_info = self.error_queue.get() print "Error: func: %s, args : %s, error_info : %s" \ % (func.func_name, args, error_info) def run(self): """执行 """ for task in self.tasks: task.start() for task in self.tasks: task.join() def test(i): """test """ result = i * 10 return result def main(): """ main """ pool = Pool(size=5) pool.add_tasks([(test, (i,)) for i in range(100)]) pool.run() if __name__ == "__main__": main()
|